From 32820802aa00f522dfd0851963e5a2f3992699ab Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Wed, 29 Sep 2021 14:23:51 +0300 Subject: [PATCH] Refactor ramping VUs executor RampingVUs.Run method was complex and this refactors it by adding rampingVUsRunState to share run state between functions used by the Run method. + Makes the Run method easier to read and understand + Makes it explicit which Goroutines are being fired in the Run + Separates responsibilities to other parts like: + rampingVUsRunState and its methods. + Moves trackProgress Goroutine from the makeProgressFn to the Run method + Makes initializeVUs to only handle initializing GRs + Uses two strategy functions to make them reusable and clearer. handleVUs and handleRemainingVUs use them. + Makes the handleVUs algorithm easier to understand and manage. --- lib/executor/ramping_vus.go | 280 ++++++++++++++++++++---------------- 1 file changed, 160 insertions(+), 120 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 5a00aa5f1070..caf7096fc0a0 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -493,154 +493,194 @@ var _ lib.Executor = &RampingVUs{} // Run constantly loops through as many iterations as possible on a variable // number of VUs for the specified stages. -// -// TODO: split up? since this does a ton of things, unfortunately I can't think -// of a less complex way to implement it (besides the old "increment by 100ms -// and see what happens)... :/ so maybe see how it can be split? -// nolint:funlen,gocognit,cyclop -func (vlv RampingVUs) Run( - parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics, -) (err error) { - rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) - regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps) - if !isFinal { - return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration) +func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { + stepsRaw := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) + regDur, finalRaw := lib.GetEndOffset(stepsRaw) + if !finalRaw { + return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur) + } + stepsGraceful := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) + maxDur, finalGraceful := lib.GetEndOffset(stepsGraceful) + if !finalGraceful { + return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur) } + startMaxVUs := lib.GetMaxPlannedVUs(stepsGraceful) + startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur) + defer cancel() - gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) - maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps) - if !isFinal { - return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) + vlv.logger.WithFields(logrus.Fields{ + "type": vlv.config.GetType(), + "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), + "maxVUs": startMaxVUs, + "duration": regDur, + "numStages": len(vlv.config.Stages), + }).Debug("Starting executor run...") + + runState := &rampingVUsRunState{ + executor: vlv, + wg: new(sync.WaitGroup), + vuHandles: make([]*vuHandle, startMaxVUs), + startMaxVUs: startMaxVUs, + activeVUsCount: new(int64), + started: startTime, + stepsRaw: stepsRaw, + stepsGraceful: stepsGraceful, + runIteration: getIterationRunner(vlv.executionState, vlv.logger), } - maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) - gracefulStop := maxDuration - regularDuration - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop) - defer cancel() + progressFn := runState.makeProgressFn(regDur) + maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{ + Name: vlv.config.Name, + Executor: vlv.config.Type, + StartTime: runState.started, + ProgressFn: progressFn, + }) + vlv.progress.Modify(pb.WithProgress(progressFn)) + go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn) - activeVUs := &sync.WaitGroup{} - defer activeVUs.Wait() + defer runState.wg.Wait() + runState.initializeVUs(maxDurCtx, cancel) + for i := uint64(0); i < runState.startMaxVUs; i++ { + go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration) + } + runState.handleVUs(ctx) + go runState.handleRemainingVUs(ctx) - // Make sure the log and the progress bar have accurate information - vlv.logger.WithFields(logrus.Fields{ - "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs, - "duration": regularDuration, "numStages": len(vlv.config.Stages), - }, - ).Debug("Starting executor run...") - - activeVUsCount := new(int64) - vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs)) - regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration) - progressFn := func() (float64, []string) { - spent := time.Since(startTime) - currentlyActiveVUs := atomic.LoadInt64(activeVUsCount) - vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - if spent > regularDuration { - return 1, []string{vus, regularDuration.String()} + return nil +} + +// rampingVUsRunState is created and initialized by the Run() method +// of the ramping VUs executor. It is used to track and modify various +// details of the execution. +type rampingVUsRunState struct { + executor RampingVUs + vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs + startMaxVUs uint64 // the scaled number of initially configured MaxVUs + activeVUsCount *int64 // the current number of active VUs, used only for the progress display + started time.Time + stepsRaw, stepsGraceful []lib.ExecutionStep + wg *sync.WaitGroup + + runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration +} + +func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) { + vusFmt := pb.GetFixedLengthIntFormat(int64(rs.startMaxVUs)) + regDuration := pb.GetFixedLengthDuration(total, total) + + return func() (float64, []string) { + spent := time.Since(rs.started) + cur := atomic.LoadInt64(rs.activeVUsCount) + progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.startMaxVUs) + if spent > total { + return 1, []string{progVUs, total.String()} } - progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr - return float64(spent) / float64(regularDuration), []string{progVUs, progDur} + progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration + return float64(spent) / float64(total), []string{progVUs, progDur} } - vlv.progress.Modify(pb.WithProgress(progressFn)) - go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn) +} - // Actually schedule the VUs and iterations, likely the most complicated - // executor among all of them... - runIteration := getIterationRunner(vlv.executionState, vlv.logger) +func (rs rampingVUsRunState) initializeVUs(ctx context.Context, cancel func()) { getVU := func() (lib.InitializedVU, error) { - initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false) + pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false) if err != nil { - vlv.logger.WithError(err).Error("Cannot get a VU from the buffer") + rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer") cancel() - } else { - activeVUs.Add(1) - atomic.AddInt64(activeVUsCount, 1) - vlv.executionState.ModCurrentlyActiveVUsCount(+1) + return pvu, err } - return initVU, err + rs.wg.Add(1) + atomic.AddInt64(rs.activeVUsCount, 1) + rs.executor.executionState.ModCurrentlyActiveVUsCount(+1) + return pvu, err } returnVU := func(initVU lib.InitializedVU) { - vlv.executionState.ReturnVU(initVU, false) - atomic.AddInt64(activeVUsCount, -1) - activeVUs.Done() - vlv.executionState.ModCurrentlyActiveVUsCount(-1) + rs.executor.executionState.ReturnVU(initVU, false) + atomic.AddInt64(rs.activeVUsCount, -1) + rs.wg.Done() + rs.executor.executionState.ModCurrentlyActiveVUsCount(-1) } - - maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ - Name: vlv.config.Name, - Executor: vlv.config.Type, - StartTime: startTime, - ProgressFn: progressFn, - }) - - vuHandles := make([]*vuHandle, maxVUs) - for i := uint64(0); i < maxVUs; i++ { - vuHandle := newStoppedVUHandle( - maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters, - &vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i)) - go vuHandle.runLoopsIfPossible(runIteration) - vuHandles[i] = vuHandle + for i := uint64(0); i < rs.startMaxVUs; i++ { + rs.vuHandles[i] = newStoppedVUHandle( + ctx, getVU, returnVU, rs.executor.nextIterationCounters, + &rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i)) } +} - // 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs - var currentScheduledVUs, currentMaxAllowedVUs uint64 - - handleNewScheduledVUs := func(newScheduledVUs uint64) { - if newScheduledVUs > currentScheduledVUs { - for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ { - _ = vuHandles[vuNum].start() // TODO handle error - } +func (rs rampingVUsRunState) handleVUs(ctx context.Context) { + // iterate over stepsRaw and stepsGraceful in order by TimeOffset + // giving stepsRaw precedence. + // we stop iterating once stepsRaw are over as we need to run the remaining + // stepsGraceful concurrently while waiting for VUs to stop in order to not wait until + // the end of gracefulStop (= maxDur-regDur) timeouts + var ( + handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() + handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy() + wait = waiter(ctx, rs.started) + ) + for len(rs.stepsRaw) > 0 { + var ( + r, g = rs.stepsRaw[0], rs.stepsGraceful[0] + gfirst = r.TimeOffset > g.TimeOffset + offset = g.TimeOffset + ) + if !gfirst { + offset = r.TimeOffset + } + if wait(offset) { + return + } + if gfirst { + handleNewMaxAllowedVUs(g) + rs.stepsGraceful = rs.stepsGraceful[1:] } else { - for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ { - vuHandles[vuNum].gracefulStop() - } + handleNewScheduledVUs(r) + rs.stepsRaw = rs.stepsRaw[1:] + } + } +} + +// TODO: removing this has no effect on tests? +func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) { + var ( + handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() + wait = waiter(ctx, rs.started) + ) + for _, s := range rs.stepsGraceful { + if wait(s.TimeOffset) { + return } - currentScheduledVUs = newScheduledVUs + handleNewMaxAllowedVUs(s) } +} - handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) { - if newMaxAllowedVUs < currentMaxAllowedVUs { - for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ { - vuHandles[vuNum].hardStop() +func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { + var max uint64 + return func(graceful lib.ExecutionStep) { + cur := graceful.PlannedVUs + if cur < max { + for n := cur; n < max; n++ { + rs.vuHandles[n].hardStop() } } - currentMaxAllowedVUs = newMaxAllowedVUs + max = cur } +} - wait := waiter(parentCtx, startTime) - // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset - // giving rawExecutionSteps precedence. - // we stop iterating once rawExecutionSteps are over as we need to run the remaining - // gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until - // the end of gracefulStop timeouts - i, j := 0, 0 - for i != len(rawExecutionSteps) { - if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset { - if wait(gracefulExecutionSteps[j].TimeOffset) { - return +func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { + var sched uint64 + return func(raw lib.ExecutionStep) { + cur := raw.PlannedVUs + if cur > sched { + for n := sched; n < cur; n++ { + _ = rs.vuHandles[n].start() // TODO handle error } - handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs) - j++ } else { - if wait(rawExecutionSteps[i].TimeOffset) { - return + for n := cur; n < sched; n++ { + rs.vuHandles[n].gracefulStop() } - handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs) - i++ } + sched = cur } - - go func() { // iterate over the remaining gracefulExecutionSteps - for _, step := range gracefulExecutionSteps[j:] { - if wait(step.TimeOffset) { - return - } - handleNewMaxAllowedVUs(step.PlannedVUs) - } - }() - - return nil } // waiter returns a function that will sleep/wait for the required time since the startTime and then @@ -651,9 +691,9 @@ func (vlv RampingVUs) Run( func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool { timer := time.NewTimer(time.Hour * 24) return func(offset time.Duration) bool { - offsetDiff := offset - time.Since(startTime) - if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum - timer.Reset(offsetDiff) + diff := offset - time.Since(startTime) + if diff > 0 { // wait until time of event arrives // TODO have a mininum + timer.Reset(diff) select { case <-ctx.Done(): return true // exit if context is cancelled