From a61714c71680a8d0f70e65bdcc490df120499119 Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Wed, 29 Sep 2021 14:23:51 +0300 Subject: [PATCH] Refactor ramping VUs executor RampingVUs.Run method was complex and this refactors it by adding rampingVUsRunState to share run state between functions used by the Run method. + Makes the Run method easier to read and understand + Makes it explicit which Goroutines are being fired in the Run + Separates responsibilities to other parts like: + rampingVUsRunState and its methods. + Moves trackProgress Goroutine from the makeProgressFn to the Run method + Makes initializeVUs to only handle initializing GRs + Uses two strategy functions to make them reusable and clearer. handleVUs and handleRemainingVUs use them. + Makes the handleVUs algorithm easier to understand and manage. `go get` was failing on Go tip. --- .github/workflows/xk6.yml | 2 +- lib/executor/ramping_vus.go | 271 ++++++++++++++++++++---------------- 2 files changed, 152 insertions(+), 121 deletions(-) diff --git a/.github/workflows/xk6.yml b/.github/workflows/xk6.yml index c9c66a931e8..171c692dfee 100644 --- a/.github/workflows/xk6.yml +++ b/.github/workflows/xk6.yml @@ -50,7 +50,7 @@ jobs: cd .github/workflows/xk6-tests # Temporary hack to ignore the v0.33.0 tag change export GONOSUMDB="go.k6.io/k6" - go get go.k6.io/xk6/cmd/xk6@master + go install go.k6.io/xk6/cmd/xk6@latest if [ "${{ github.event_name }}" == "pull_request" -a \ "${{ github.event.pull_request.head.repo.full_name }}" != "${{ github.repository }}" ]; then export XK6_K6_REPO="github.com/${{ github.event.pull_request.head.repo.full_name }}" diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 5a00aa5f107..5c55c6eb85b 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -493,154 +493,185 @@ var _ lib.Executor = &RampingVUs{} // Run constantly loops through as many iterations as possible on a variable // number of VUs for the specified stages. -// -// TODO: split up? since this does a ton of things, unfortunately I can't think -// of a less complex way to implement it (besides the old "increment by 100ms -// and see what happens)... :/ so maybe see how it can be split? -// nolint:funlen,gocognit,cyclop -func (vlv RampingVUs) Run( - parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics, -) (err error) { - rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) - regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps) - if !isFinal { - return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration) +func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { + stepsRaw := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) + regDur, finalRaw := lib.GetEndOffset(stepsRaw) + if !finalRaw { + return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur) } - - gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) - maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps) - if !isFinal { - return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) + stepsGraceful := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) + maxDur, finalGraceful := lib.GetEndOffset(stepsGraceful) + if !finalGraceful { + return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur) } - maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) - gracefulStop := maxDuration - regularDuration - - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop) + startMaxVUs := lib.GetMaxPlannedVUs(stepsGraceful) + startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur) defer cancel() - activeVUs := &sync.WaitGroup{} - defer activeVUs.Wait() - - // Make sure the log and the progress bar have accurate information vlv.logger.WithFields(logrus.Fields{ - "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs, - "duration": regularDuration, "numStages": len(vlv.config.Stages), - }, - ).Debug("Starting executor run...") - - activeVUsCount := new(int64) - vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs)) - regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration) - progressFn := func() (float64, []string) { - spent := time.Since(startTime) - currentlyActiveVUs := atomic.LoadInt64(activeVUsCount) - vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - if spent > regularDuration { - return 1, []string{vus, regularDuration.String()} - } - progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr - return float64(spent) / float64(regularDuration), []string{progVUs, progDur} + "type": vlv.config.GetType(), + "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), + "maxVUs": startMaxVUs, + "duration": regDur, + "numStages": len(vlv.config.Stages), + }).Debug("Starting executor run...") + + runState := &rampingVUsRunState{ + executor: vlv, + wg: new(sync.WaitGroup), + vuHandles: make([]*vuHandle, startMaxVUs), + maxVUs: startMaxVUs, + activeVUsCount: new(int64), + started: startTime, + stepsRaw: stepsRaw, + stepsGraceful: stepsGraceful, + runIteration: getIterationRunner(vlv.executionState, vlv.logger), } - vlv.progress.Modify(pb.WithProgress(progressFn)) - go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn) - // Actually schedule the VUs and iterations, likely the most complicated - // executor among all of them... - runIteration := getIterationRunner(vlv.executionState, vlv.logger) - getVU := func() (lib.InitializedVU, error) { - initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false) - if err != nil { - vlv.logger.WithError(err).Error("Cannot get a VU from the buffer") - cancel() - } else { - activeVUs.Add(1) - atomic.AddInt64(activeVUsCount, 1) - vlv.executionState.ModCurrentlyActiveVUsCount(+1) - } - return initVU, err - } - returnVU := func(initVU lib.InitializedVU) { - vlv.executionState.ReturnVU(initVU, false) - atomic.AddInt64(activeVUsCount, -1) - activeVUs.Done() - vlv.executionState.ModCurrentlyActiveVUsCount(-1) - } - - maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ + progressFn := runState.makeProgressFn(regDur) + maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{ Name: vlv.config.Name, Executor: vlv.config.Type, - StartTime: startTime, + StartTime: runState.started, ProgressFn: progressFn, }) + vlv.progress.Modify(pb.WithProgress(progressFn)) + go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn) - vuHandles := make([]*vuHandle, maxVUs) - for i := uint64(0); i < maxVUs; i++ { - vuHandle := newStoppedVUHandle( - maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters, - &vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i)) - go vuHandle.runLoopsIfPossible(runIteration) - vuHandles[i] = vuHandle + defer runState.wg.Wait() + runState.initializeVUs(maxDurCtx, cancel) + for i := uint64(0); i < runState.maxVUs; i++ { + go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration) } + runState.handleVUs(ctx) + go runState.handleRemainingVUs(ctx) - // 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs - var currentScheduledVUs, currentMaxAllowedVUs uint64 + return nil +} - handleNewScheduledVUs := func(newScheduledVUs uint64) { - if newScheduledVUs > currentScheduledVUs { - for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ { - _ = vuHandles[vuNum].start() // TODO handle error - } - } else { - for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ { - vuHandles[vuNum].gracefulStop() - } +// rampingVUsRunState is created and initialized by the Run() method +// of the ramping VUs executor. It is used to track and modify various +// details of the execution. +type rampingVUsRunState struct { + executor RampingVUs + vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs + maxVUs uint64 // the scaled number of initially configured MaxVUs + activeVUsCount *int64 // the current number of active VUs, used only for the progress display + started time.Time + stepsRaw, stepsGraceful []lib.ExecutionStep + wg *sync.WaitGroup + + runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration +} + +func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) { + vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs)) + regDuration := pb.GetFixedLengthDuration(total, total) + + return func() (float64, []string) { + spent := time.Since(rs.started) + cur := atomic.LoadInt64(rs.activeVUsCount) + progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs) + if spent > total { + return 1, []string{progVUs, total.String()} } - currentScheduledVUs = newScheduledVUs + progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration + return float64(spent) / float64(total), []string{progVUs, progDur} } +} - handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) { - if newMaxAllowedVUs < currentMaxAllowedVUs { - for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ { - vuHandles[vuNum].hardStop() - } +func (rs rampingVUsRunState) initializeVUs(ctx context.Context, cancel func()) { + getVU := func() (lib.InitializedVU, error) { + pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false) + if err != nil { + rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer") + cancel() + return pvu, err } - currentMaxAllowedVUs = newMaxAllowedVUs + rs.wg.Add(1) + atomic.AddInt64(rs.activeVUsCount, 1) + rs.executor.executionState.ModCurrentlyActiveVUsCount(+1) + return pvu, err + } + returnVU := func(initVU lib.InitializedVU) { + rs.executor.executionState.ReturnVU(initVU, false) + atomic.AddInt64(rs.activeVUsCount, -1) + rs.wg.Done() + rs.executor.executionState.ModCurrentlyActiveVUsCount(-1) } + for i := uint64(0); i < rs.maxVUs; i++ { + rs.vuHandles[i] = newStoppedVUHandle( + ctx, getVU, returnVU, rs.executor.nextIterationCounters, + &rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i)) + } +} - wait := waiter(parentCtx, startTime) - // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset - // giving rawExecutionSteps precedence. - // we stop iterating once rawExecutionSteps are over as we need to run the remaining - // gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until - // the end of gracefulStop timeouts - i, j := 0, 0 - for i != len(rawExecutionSteps) { - if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset { - if wait(gracefulExecutionSteps[j].TimeOffset) { +func (rs rampingVUsRunState) handleVUs(ctx context.Context) { + // iterate over stepsRaw and stepsGraceful in order by TimeOffset + // giving stepsRaw precedence. + // we stop iterating once stepsRaw are over as we need to run the remaining + // stepsGraceful concurrently while waiting for VUs to stop in order to not wait until + // the end of gracefulStop (= maxDur-regDur) timeouts + var ( + handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() + handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy() + wait = waiter(ctx, rs.started) + ) + for i, j := 0, 0; i != len(rs.stepsRaw); { + r, g := rs.stepsRaw[i], rs.stepsGraceful[j] + if g.TimeOffset < r.TimeOffset { + j++ + if wait(g.TimeOffset) { return } - handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs) - j++ + handleNewMaxAllowedVUs(g) } else { - if wait(rawExecutionSteps[i].TimeOffset) { + i++ + if wait(r.TimeOffset) { return } - handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs) - i++ + handleNewScheduledVUs(r) } } +} - go func() { // iterate over the remaining gracefulExecutionSteps - for _, step := range gracefulExecutionSteps[j:] { - if wait(step.TimeOffset) { - return - } - handleNewMaxAllowedVUs(step.PlannedVUs) +// TODO: removing this has no effect on tests? +func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) { + var ( + handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() + wait = waiter(ctx, rs.started) + ) + for _, s := range rs.stepsGraceful { + if wait(s.TimeOffset) { + return } - }() + handleNewMaxAllowedVUs(s) + } +} - return nil +func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { + var cur uint64 + return func(graceful lib.ExecutionStep) { + pv := graceful.PlannedVUs + for ; pv < cur; cur-- { + rs.vuHandles[cur-1].hardStop() + } + cur = pv + } +} + +func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { + var cur uint64 + return func(raw lib.ExecutionStep) { + pv := raw.PlannedVUs + for ; cur < pv; cur++ { + rs.vuHandles[cur].start() + } + for ; pv < cur; cur-- { + rs.vuHandles[cur-1].gracefulStop() + } + cur = pv + } } // waiter returns a function that will sleep/wait for the required time since the startTime and then @@ -651,9 +682,9 @@ func (vlv RampingVUs) Run( func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool { timer := time.NewTimer(time.Hour * 24) return func(offset time.Duration) bool { - offsetDiff := offset - time.Since(startTime) - if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum - timer.Reset(offsetDiff) + diff := offset - time.Since(startTime) + if diff > 0 { // wait until time of event arrives // TODO have a mininum + timer.Reset(diff) select { case <-ctx.Done(): return true // exit if context is cancelled