mirror of
https://github.com/httprunner/httprunner.git
synced 2026-06-06 08:19:45 +08:00
refactor: move hrp/ to root folder
This commit is contained in:
266
step_rendezvous.go
Normal file
266
step_rendezvous.go
Normal file
@@ -0,0 +1,266 @@
|
||||
package hrp
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// StepRendezvous implements IStep interface.
|
||||
type StepRendezvous struct {
|
||||
StepConfig
|
||||
Rendezvous *Rendezvous `json:"rendezvous,omitempty" yaml:"rendezvous,omitempty"`
|
||||
}
|
||||
|
||||
func (s *StepRendezvous) Name() string {
|
||||
if s.StepName != "" {
|
||||
return s.StepName
|
||||
}
|
||||
return s.Rendezvous.Name
|
||||
}
|
||||
|
||||
func (s *StepRendezvous) Type() StepType {
|
||||
return StepTypeRendezvous
|
||||
}
|
||||
|
||||
func (s *StepRendezvous) Config() *StepConfig {
|
||||
return &StepConfig{
|
||||
StepName: s.StepName,
|
||||
Variables: s.Variables,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StepRendezvous) Run(r *SessionRunner) (*StepResult, error) {
|
||||
rendezvous := s.Rendezvous
|
||||
log.Info().
|
||||
Str("name", rendezvous.Name).
|
||||
Float32("percent", rendezvous.Percent).
|
||||
Int64("number", rendezvous.Number).
|
||||
Int64("timeout", rendezvous.Timeout).
|
||||
Msg("rendezvous")
|
||||
|
||||
stepResult := &StepResult{
|
||||
Name: rendezvous.Name,
|
||||
StepType: StepTypeRendezvous,
|
||||
Success: true,
|
||||
}
|
||||
|
||||
// pass current rendezvous if already released, activate rendezvous sequentially after spawn done
|
||||
if rendezvous.isReleased() || !isPreRendezvousAllReleased(rendezvous, &r.caseRunner.TestCase) || !rendezvous.isSpawnDone() {
|
||||
return stepResult, nil
|
||||
}
|
||||
|
||||
// activate the rendezvous only once during each cycle
|
||||
rendezvous.once.Do(func() {
|
||||
close(rendezvous.activateChan)
|
||||
})
|
||||
|
||||
// check current cnt using double check lock before updating to avoid negative WaitGroup counter
|
||||
if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number {
|
||||
rendezvous.lock.Lock()
|
||||
if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number {
|
||||
atomic.AddInt64(&rendezvous.cnt, 1)
|
||||
rendezvous.wg.Done()
|
||||
rendezvous.timerResetChan <- struct{}{}
|
||||
}
|
||||
rendezvous.lock.Unlock()
|
||||
}
|
||||
|
||||
// block until current rendezvous released
|
||||
<-rendezvous.releaseChan
|
||||
return stepResult, nil
|
||||
}
|
||||
|
||||
func isPreRendezvousAllReleased(rendezvous *Rendezvous, testCase *TestCase) bool {
|
||||
for _, step := range testCase.TestSteps {
|
||||
if step.Type() != StepTypeRendezvous {
|
||||
continue
|
||||
}
|
||||
preRendezvous := step.(*StepRendezvous).Rendezvous
|
||||
if preRendezvous == nil {
|
||||
continue
|
||||
}
|
||||
// meet current rendezvous, all previous rendezvous released, return true
|
||||
if preRendezvous == rendezvous {
|
||||
return true
|
||||
}
|
||||
if !preRendezvous.isReleased() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// WithUserNumber sets the user number needed to release the current rendezvous
|
||||
func (s *StepRendezvous) WithUserNumber(number int64) *StepRendezvous {
|
||||
s.Rendezvous.Number = number
|
||||
return s
|
||||
}
|
||||
|
||||
// WithUserPercent sets the user percent needed to release the current rendezvous
|
||||
func (s *StepRendezvous) WithUserPercent(percent float32) *StepRendezvous {
|
||||
s.Rendezvous.Percent = percent
|
||||
return s
|
||||
}
|
||||
|
||||
// WithTimeout sets the timeout of duration between each user arriving at the current rendezvous
|
||||
func (s *StepRendezvous) WithTimeout(timeout int64) *StepRendezvous {
|
||||
s.Rendezvous.Timeout = timeout
|
||||
return s
|
||||
}
|
||||
|
||||
const (
|
||||
defaultRendezvousTimeout int64 = 5000
|
||||
defaultRendezvousPercent float32 = 1.0
|
||||
)
|
||||
|
||||
type Rendezvous struct {
|
||||
Name string `json:"name" yaml:"name"` // required
|
||||
Percent float32 `json:"percent,omitempty" yaml:"percent,omitempty"` // default to 1(100%)
|
||||
Number int64 `json:"number,omitempty" yaml:"number,omitempty"`
|
||||
Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"` // milliseconds
|
||||
cnt int64
|
||||
releasedFlag uint32
|
||||
spawnDoneFlag uint32
|
||||
wg sync.WaitGroup
|
||||
timerResetChan chan struct{}
|
||||
activateChan chan struct{}
|
||||
releaseChan chan struct{}
|
||||
once *sync.Once
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (r *Rendezvous) reset() {
|
||||
r.cnt = 0
|
||||
r.releasedFlag = 0
|
||||
r.wg.Add(int(r.Number))
|
||||
// timerResetChan channel will not be closed, thus init only once
|
||||
if r.timerResetChan == nil {
|
||||
r.timerResetChan = make(chan struct{})
|
||||
}
|
||||
r.activateChan = make(chan struct{})
|
||||
r.releaseChan = make(chan struct{})
|
||||
r.once = new(sync.Once)
|
||||
}
|
||||
|
||||
func (r *Rendezvous) isSpawnDone() bool {
|
||||
return atomic.LoadUint32(&r.spawnDoneFlag) == 1
|
||||
}
|
||||
|
||||
func (r *Rendezvous) SetSpawnDone() {
|
||||
atomic.StoreUint32(&r.spawnDoneFlag, 1)
|
||||
}
|
||||
|
||||
func (r *Rendezvous) isReleased() bool {
|
||||
return atomic.LoadUint32(&r.releasedFlag) == 1
|
||||
}
|
||||
|
||||
func (r *Rendezvous) setReleased() {
|
||||
atomic.StoreUint32(&r.releasedFlag, 1)
|
||||
}
|
||||
|
||||
func (r *Rendezvous) updateRendezvousNumber(number int64) {
|
||||
atomic.StoreInt64(&r.Number, int64(float32(number)*r.Percent))
|
||||
}
|
||||
|
||||
func InitRendezvous(testcase *TestCase, total int64) []*Rendezvous {
|
||||
var rendezvousList []*Rendezvous
|
||||
for _, s := range testcase.TestSteps {
|
||||
step := s.(*StepRendezvous)
|
||||
if step.Rendezvous == nil {
|
||||
continue
|
||||
}
|
||||
rendezvous := step.Rendezvous
|
||||
|
||||
// either number or percent should be correctly put, otherwise set to default (total)
|
||||
if rendezvous.Number == 0 && rendezvous.Percent > 0 && rendezvous.Percent <= defaultRendezvousPercent {
|
||||
rendezvous.Number = int64(rendezvous.Percent * float32(total))
|
||||
} else if rendezvous.Number > 0 && rendezvous.Number <= total && rendezvous.Percent == 0 {
|
||||
rendezvous.Percent = float32(rendezvous.Number) / float32(total)
|
||||
} else {
|
||||
log.Warn().
|
||||
Str("name", rendezvous.Name).
|
||||
Int64("default number", total).
|
||||
Float32("default percent", defaultRendezvousPercent).
|
||||
Msg("rendezvous parameter not defined or error, set to default value")
|
||||
rendezvous.Number = total
|
||||
rendezvous.Percent = defaultRendezvousPercent
|
||||
}
|
||||
|
||||
if rendezvous.Timeout <= 0 {
|
||||
rendezvous.Timeout = defaultRendezvousTimeout
|
||||
}
|
||||
|
||||
rendezvous.reset()
|
||||
rendezvousList = append(rendezvousList, rendezvous)
|
||||
}
|
||||
return rendezvousList
|
||||
}
|
||||
|
||||
func WaitRendezvous(rendezvousList []*Rendezvous, b IBoomer) {
|
||||
if rendezvousList != nil {
|
||||
lastRendezvous := rendezvousList[len(rendezvousList)-1]
|
||||
for _, rendezvous := range rendezvousList {
|
||||
go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous, b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous, b IBoomer) {
|
||||
for {
|
||||
// cycle start: block current checking until current rendezvous activated
|
||||
<-rendezvous.activateChan
|
||||
stop := make(chan struct{})
|
||||
timeout := time.Duration(rendezvous.Timeout) * time.Millisecond
|
||||
timer := time.NewTimer(timeout)
|
||||
go func() {
|
||||
defer close(stop)
|
||||
rendezvous.wg.Wait()
|
||||
}()
|
||||
for !rendezvous.isReleased() {
|
||||
select {
|
||||
case <-rendezvous.timerResetChan:
|
||||
timer.Reset(timeout)
|
||||
case <-stop:
|
||||
rendezvous.setReleased()
|
||||
close(rendezvous.releaseChan)
|
||||
log.Info().
|
||||
Str("name", rendezvous.Name).
|
||||
Float32("percent", rendezvous.Percent).
|
||||
Int64("number", rendezvous.Number).
|
||||
Int64("timeout(ms)", rendezvous.Timeout).
|
||||
Int64("cnt", rendezvous.cnt).
|
||||
Str("reason", "rendezvous release condition satisfied").
|
||||
Msg("rendezvous released")
|
||||
case <-timer.C:
|
||||
rendezvous.setReleased()
|
||||
close(rendezvous.releaseChan)
|
||||
log.Info().
|
||||
Str("name", rendezvous.Name).
|
||||
Float32("percent", rendezvous.Percent).
|
||||
Int64("number", rendezvous.Number).
|
||||
Int64("timeout(ms)", rendezvous.Timeout).
|
||||
Int64("cnt", rendezvous.cnt).
|
||||
Str("reason", "time's up").
|
||||
Msg("rendezvous released")
|
||||
}
|
||||
}
|
||||
// cycle end: reset all previous rendezvous after last rendezvous released
|
||||
// otherwise, block current checker until the last rendezvous end
|
||||
if rendezvous == lastRendezvous {
|
||||
for _, r := range rendezvousList {
|
||||
r.reset()
|
||||
// dynamic adjustment based on the number of concurrent users
|
||||
r.updateRendezvousNumber(int64(b.GetSpawnCount()))
|
||||
}
|
||||
} else {
|
||||
<-lastRendezvous.releaseChan
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type IBoomer interface {
|
||||
GetSpawnCount() int
|
||||
}
|
||||
Reference in New Issue
Block a user