package hrp import ( "sync" "sync/atomic" "time" "github.com/rs/zerolog/log" ) // StepRendezvous implements IStep interface. type StepRendezvous struct { StepConfig Rendezvous *Rendezvous `json:"rendezvous,omitempty" yaml:"rendezvous,omitempty"` } func (s *StepRendezvous) Name() string { if s.StepName != "" { return s.StepName } return s.Rendezvous.Name } func (s *StepRendezvous) Type() StepType { return StepTypeRendezvous } func (s *StepRendezvous) Config() *StepConfig { return &StepConfig{ StepName: s.StepName, Variables: s.Variables, } } func (s *StepRendezvous) Run(r *SessionRunner) (*StepResult, error) { rendezvous := s.Rendezvous log.Info(). Str("name", rendezvous.Name). Float32("percent", rendezvous.Percent). Int64("number", rendezvous.Number). Int64("timeout", rendezvous.Timeout). Msg("rendezvous") stepResult := &StepResult{ Name: s.Name(), StepType: s.Type(), Success: true, } // pass current rendezvous if already released, activate rendezvous sequentially after spawn done if rendezvous.isReleased() || !isPreRendezvousAllReleased(rendezvous, &r.caseRunner.TestCase) || !rendezvous.isSpawnDone() { return stepResult, nil } // activate the rendezvous only once during each cycle rendezvous.once.Do(func() { close(rendezvous.activateChan) }) // check current cnt using double check lock before updating to avoid negative WaitGroup counter if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number { rendezvous.lock.Lock() if atomic.LoadInt64(&rendezvous.cnt) < rendezvous.Number { atomic.AddInt64(&rendezvous.cnt, 1) rendezvous.wg.Done() rendezvous.timerResetChan <- struct{}{} } rendezvous.lock.Unlock() } // block until current rendezvous released <-rendezvous.releaseChan return stepResult, nil } func isPreRendezvousAllReleased(rendezvous *Rendezvous, testCase *TestCase) bool { for _, step := range testCase.TestSteps { if step.Type() != StepTypeRendezvous { continue } preRendezvous := step.(*StepRendezvous).Rendezvous if preRendezvous == nil { continue } // meet current rendezvous, all previous rendezvous released, return true if preRendezvous == rendezvous { return true } if !preRendezvous.isReleased() { return false } } return true } // WithUserNumber sets the user number needed to release the current rendezvous func (s *StepRendezvous) WithUserNumber(number int64) *StepRendezvous { s.Rendezvous.Number = number return s } // WithUserPercent sets the user percent needed to release the current rendezvous func (s *StepRendezvous) WithUserPercent(percent float32) *StepRendezvous { s.Rendezvous.Percent = percent return s } // WithTimeout sets the timeout of duration between each user arriving at the current rendezvous func (s *StepRendezvous) WithTimeout(timeout int64) *StepRendezvous { s.Rendezvous.Timeout = timeout return s } const ( defaultRendezvousTimeout int64 = 5000 defaultRendezvousPercent float32 = 1.0 ) type Rendezvous struct { Name string `json:"name" yaml:"name"` // required Percent float32 `json:"percent,omitempty" yaml:"percent,omitempty"` // default to 1(100%) Number int64 `json:"number,omitempty" yaml:"number,omitempty"` Timeout int64 `json:"timeout,omitempty" yaml:"timeout,omitempty"` // milliseconds cnt int64 releasedFlag uint32 spawnDoneFlag uint32 wg sync.WaitGroup timerResetChan chan struct{} activateChan chan struct{} releaseChan chan struct{} once *sync.Once lock sync.Mutex } func (r *Rendezvous) reset() { r.cnt = 0 r.releasedFlag = 0 r.wg.Add(int(r.Number)) // timerResetChan channel will not be closed, thus init only once if r.timerResetChan == nil { r.timerResetChan = make(chan struct{}) } r.activateChan = make(chan struct{}) r.releaseChan = make(chan struct{}) r.once = new(sync.Once) } func (r *Rendezvous) isSpawnDone() bool { return atomic.LoadUint32(&r.spawnDoneFlag) == 1 } func (r *Rendezvous) SetSpawnDone() { atomic.StoreUint32(&r.spawnDoneFlag, 1) } func (r *Rendezvous) isReleased() bool { return atomic.LoadUint32(&r.releasedFlag) == 1 } func (r *Rendezvous) setReleased() { atomic.StoreUint32(&r.releasedFlag, 1) } func (r *Rendezvous) updateRendezvousNumber(number int64) { atomic.StoreInt64(&r.Number, int64(float32(number)*r.Percent)) } func InitRendezvous(testcase *TestCase, total int64) []*Rendezvous { var rendezvousList []*Rendezvous for _, s := range testcase.TestSteps { step := s.(*StepRendezvous) if step.Rendezvous == nil { continue } rendezvous := step.Rendezvous // either number or percent should be correctly put, otherwise set to default (total) if rendezvous.Number == 0 && rendezvous.Percent > 0 && rendezvous.Percent <= defaultRendezvousPercent { rendezvous.Number = int64(rendezvous.Percent * float32(total)) } else if rendezvous.Number > 0 && rendezvous.Number <= total && rendezvous.Percent == 0 { rendezvous.Percent = float32(rendezvous.Number) / float32(total) } else { log.Warn(). Str("name", rendezvous.Name). Int64("default number", total). Float32("default percent", defaultRendezvousPercent). Msg("rendezvous parameter not defined or error, set to default value") rendezvous.Number = total rendezvous.Percent = defaultRendezvousPercent } if rendezvous.Timeout <= 0 { rendezvous.Timeout = defaultRendezvousTimeout } rendezvous.reset() rendezvousList = append(rendezvousList, rendezvous) } return rendezvousList } func WaitRendezvous(rendezvousList []*Rendezvous, b IBoomer) { if rendezvousList != nil { lastRendezvous := rendezvousList[len(rendezvousList)-1] for _, rendezvous := range rendezvousList { go waitSingleRendezvous(rendezvous, rendezvousList, lastRendezvous, b) } } } func waitSingleRendezvous(rendezvous *Rendezvous, rendezvousList []*Rendezvous, lastRendezvous *Rendezvous, b IBoomer) { for { // cycle start: block current checking until current rendezvous activated <-rendezvous.activateChan stop := make(chan struct{}) timeout := time.Duration(rendezvous.Timeout) * time.Millisecond timer := time.NewTimer(timeout) go func() { defer close(stop) rendezvous.wg.Wait() }() for !rendezvous.isReleased() { select { case <-rendezvous.timerResetChan: timer.Reset(timeout) case <-stop: rendezvous.setReleased() close(rendezvous.releaseChan) log.Info(). Str("name", rendezvous.Name). Float32("percent", rendezvous.Percent). Int64("number", rendezvous.Number). Int64("timeout(ms)", rendezvous.Timeout). Int64("cnt", rendezvous.cnt). Str("reason", "rendezvous release condition satisfied"). Msg("rendezvous released") case <-timer.C: rendezvous.setReleased() close(rendezvous.releaseChan) log.Info(). Str("name", rendezvous.Name). Float32("percent", rendezvous.Percent). Int64("number", rendezvous.Number). Int64("timeout(ms)", rendezvous.Timeout). Int64("cnt", rendezvous.cnt). Str("reason", "time's up"). Msg("rendezvous released") } } // cycle end: reset all previous rendezvous after last rendezvous released // otherwise, block current checker until the last rendezvous end if rendezvous == lastRendezvous { for _, r := range rendezvousList { r.reset() // dynamic adjustment based on the number of concurrent users r.updateRendezvousNumber(int64(b.GetSpawnCount())) } } else { <-lastRendezvous.releaseChan } } } type IBoomer interface { GetSpawnCount() int }