diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 4bd73272558..18a07b60306 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -469,7 +469,7 @@ func (vlvc RampingVUsConfig) GetExecutionRequirements(et *lib.ExecutionTuple) [] // NewExecutor creates a new RampingVUs executor func (vlvc RampingVUsConfig) NewExecutor(es *lib.ExecutionState, logger *logrus.Entry) (lib.Executor, error) { - return RampingVUs{ + return &RampingVUs{ BaseExecutor: NewBaseExecutor(vlvc, es, logger), config: vlvc, }, nil @@ -486,174 +486,233 @@ func (vlvc RampingVUsConfig) HasWork(et *lib.ExecutionTuple) bool { type RampingVUs struct { *BaseExecutor config RampingVUsConfig + + rawSteps, gracefulSteps []lib.ExecutionStep } // Make sure we implement the lib.Executor interface. var _ lib.Executor = &RampingVUs{} +// Init initializes the rampingVUs executor by precalculating the raw +// and graceful steps. +func (vlv *RampingVUs) Init(_ context.Context) error { + vlv.rawSteps = vlv.config.getRawExecutionSteps( + vlv.executionState.ExecutionTuple, true, + ) + vlv.gracefulSteps = vlv.config.GetExecutionRequirements( + vlv.executionState.ExecutionTuple, + ) + return nil +} + // Run constantly loops through as many iterations as possible on a variable // number of VUs for the specified stages. -// -// TODO: split up? since this does a ton of things, unfortunately I can't think -// of a less complex way to implement it (besides the old "increment by 100ms -// and see what happens)... :/ so maybe see how it can be split? -// nolint:funlen,gocognit,cyclop -func (vlv RampingVUs) Run( - parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics, -) (err error) { - rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) - regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps) +func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { + regularDuration, isFinal := lib.GetEndOffset(vlv.rawSteps) if !isFinal { return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration) } - - gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) - maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps) + maxDuration, isFinal := lib.GetEndOffset(vlv.gracefulSteps) if !isFinal { return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) } - maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) - gracefulStop := maxDuration - regularDuration - - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop) + startTime, maxDurationCtx, regularDurationCtx, cancel := getDurationContexts( + ctx, regularDuration, maxDuration-regularDuration, + ) defer cancel() - activeVUs := &sync.WaitGroup{} - defer activeVUs.Wait() + maxVUs := lib.GetMaxPlannedVUs(vlv.gracefulSteps) - // Make sure the log and the progress bar have accurate information vlv.logger.WithFields(logrus.Fields{ - "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs, - "duration": regularDuration, "numStages": len(vlv.config.Stages), - }, - ).Debug("Starting executor run...") - - activeVUsCount := new(int64) - vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs)) - regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration) - progressFn := func() (float64, []string) { - spent := time.Since(startTime) - currentlyActiveVUs := atomic.LoadInt64(activeVUsCount) - vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - if spent > regularDuration { - return 1, []string{vus, regularDuration.String()} - } - progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr - return float64(spent) / float64(regularDuration), []string{progVUs, progDur} - } - vlv.progress.Modify(pb.WithProgress(progressFn)) - go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn) - - // Actually schedule the VUs and iterations, likely the most complicated - // executor among all of them... - runIteration := getIterationRunner(vlv.executionState, vlv.logger) - getVU := func() (lib.InitializedVU, error) { - initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false) - if err != nil { - vlv.logger.WithError(err).Error("Cannot get a VU from the buffer") - cancel() - } else { - activeVUs.Add(1) - atomic.AddInt64(activeVUsCount, 1) - vlv.executionState.ModCurrentlyActiveVUsCount(+1) - } - return initVU, err - } - returnVU := func(initVU lib.InitializedVU) { - vlv.executionState.ReturnVU(initVU, false) - atomic.AddInt64(activeVUsCount, -1) - activeVUs.Done() - vlv.executionState.ModCurrentlyActiveVUsCount(-1) + "type": vlv.config.GetType(), + "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), + "maxVUs": maxVUs, + "duration": regularDuration, + "numStages": len(vlv.config.Stages), + }).Debug("Starting executor run...") + + runState := &rampingVUsRunState{ + executor: vlv, + vuHandles: make([]*vuHandle, maxVUs), + maxVUs: maxVUs, + activeVUsCount: new(int64), + started: startTime, + runIteration: getIterationRunner(vlv.executionState, vlv.logger), } + progressFn := runState.makeProgressFn(regularDuration) maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ Name: vlv.config.Name, Executor: vlv.config.Type, - StartTime: startTime, + StartTime: runState.started, ProgressFn: progressFn, }) + vlv.progress.Modify(pb.WithProgress(progressFn)) + go trackProgress(ctx, maxDurationCtx, regularDurationCtx, vlv, progressFn) - vuHandles := make([]*vuHandle, maxVUs) - for i := uint64(0); i < maxVUs; i++ { - vuHandle := newStoppedVUHandle( - maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters, - &vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i)) - go vuHandle.runLoopsIfPossible(runIteration) - vuHandles[i] = vuHandle - } + defer runState.wg.Wait() + // this will populate stopped VUs and run runLoopsIfPossible on each VU + // handle in a new goroutine + runState.runLoopsIfPossible(maxDurationCtx, cancel) - // 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs - var currentScheduledVUs, currentMaxAllowedVUs uint64 + var ( + handleNewMaxAllowedVUs = runState.maxAllowedVUsHandlerStrategy() + handleNewScheduledVUs = runState.scheduledVUsHandlerStrategy() + ) + handledGracefulSteps := runState.iterateSteps( + ctx, + handleNewMaxAllowedVUs, + handleNewScheduledVUs, + ) + go runState.runRemainingGracefulSteps( + ctx, + handleNewMaxAllowedVUs, + handledGracefulSteps, + ) + return nil +} - handleNewScheduledVUs := func(newScheduledVUs uint64) { - if newScheduledVUs > currentScheduledVUs { - for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ { - _ = vuHandles[vuNum].start() // TODO handle error - } - } else { - for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ { - vuHandles[vuNum].gracefulStop() - } +// rampingVUsRunState is created and initialized by the Run() method +// of the ramping VUs executor. It is used to track and modify various +// details of the execution. +type rampingVUsRunState struct { + executor *RampingVUs + vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs + maxVUs uint64 // the scaled number of initially configured MaxVUs + activeVUsCount *int64 // the current number of active VUs, used only for the progress display + started time.Time + wg sync.WaitGroup + + runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration +} + +func (rs *rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn func() (float64, []string)) { + vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs)) + regularDuration := pb.GetFixedLengthDuration(regular, regular) + + return func() (float64, []string) { + spent := time.Since(rs.started) + cur := atomic.LoadInt64(rs.activeVUsCount) + progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs) + if spent > regular { + return 1, []string{progVUs, regular.String()} } - currentScheduledVUs = newScheduledVUs + status := pb.GetFixedLengthDuration(spent, regular) + "/" + regularDuration + return float64(spent) / float64(regular), []string{progVUs, status} } +} - handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) { - if newMaxAllowedVUs < currentMaxAllowedVUs { - for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ { - vuHandles[vuNum].hardStop() - } +func (rs *rampingVUsRunState) runLoopsIfPossible(ctx context.Context, cancel func()) { + getVU := func() (lib.InitializedVU, error) { + pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false) + if err != nil { + rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer") + cancel() + return pvu, err } - currentMaxAllowedVUs = newMaxAllowedVUs + rs.wg.Add(1) + atomic.AddInt64(rs.activeVUsCount, 1) + rs.executor.executionState.ModCurrentlyActiveVUsCount(+1) + return pvu, err + } + returnVU := func(initVU lib.InitializedVU) { + rs.executor.executionState.ReturnVU(initVU, false) + atomic.AddInt64(rs.activeVUsCount, -1) + rs.wg.Done() + rs.executor.executionState.ModCurrentlyActiveVUsCount(-1) } + for i := uint64(0); i < rs.maxVUs; i++ { + rs.vuHandles[i] = newStoppedVUHandle( + ctx, getVU, returnVU, rs.executor.nextIterationCounters, + &rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i)) + go rs.vuHandles[i].runLoopsIfPossible(rs.runIteration) + } +} - wait := waiter(parentCtx, startTime) - // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset - // giving rawExecutionSteps precedence. - // we stop iterating once rawExecutionSteps are over as we need to run the remaining - // gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until - // the end of gracefulStop timeouts +// iterateSteps iterates over rawSteps and gracefulSteps in order according to +// their TimeOffsets, prioritizing rawSteps. It stops iterating once rawSteps +// are over. And it returns the number of handled gracefulSteps. +func (rs *rampingVUsRunState) iterateSteps( + ctx context.Context, + handleNewMaxAllowedVUs, handleNewScheduledVUs func(lib.ExecutionStep), +) (handledGracefulSteps int) { + wait := waiter(ctx, rs.started) i, j := 0, 0 - for i != len(rawExecutionSteps) { - if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset { - if wait(gracefulExecutionSteps[j].TimeOffset) { - return + for i != len(rs.executor.rawSteps) { + r, g := rs.executor.rawSteps[i], rs.executor.gracefulSteps[j] + if g.TimeOffset < r.TimeOffset { + if wait(g.TimeOffset) { + break } - handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs) + handleNewMaxAllowedVUs(g) j++ } else { - if wait(rawExecutionSteps[i].TimeOffset) { - return + if wait(r.TimeOffset) { + break } - handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs) + handleNewScheduledVUs(r) i++ } } + return j +} - go func() { // iterate over the remaining gracefulExecutionSteps - for _, step := range gracefulExecutionSteps[j:] { - if wait(step.TimeOffset) { - return - } - handleNewMaxAllowedVUs(step.PlannedVUs) +// runRemainingGracefulSteps runs the remaining gracefulSteps concurrently +// before the gracefulStop timeout period stops VUs. +// +// This way we will have run the gracefulSteps at the same time while +// waiting for the VUs to finish. +// +// (gracefulStop = maxDuration-regularDuration) +func (rs *rampingVUsRunState) runRemainingGracefulSteps( + ctx context.Context, + handleNewMaxAllowedVUs func(lib.ExecutionStep), + handledGracefulSteps int, +) { + wait := waiter(ctx, rs.started) + for _, s := range rs.executor.gracefulSteps[handledGracefulSteps:] { + if wait(s.TimeOffset) { + return } - }() + handleNewMaxAllowedVUs(s) + } +} - return nil +func (rs *rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { + var cur uint64 // current number of planned graceful VUs + return func(graceful lib.ExecutionStep) { + pv := graceful.PlannedVUs + for ; pv < cur; cur-- { + rs.vuHandles[cur-1].hardStop() + } + cur = pv + } +} + +func (rs *rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { + var cur uint64 // current number of planned raw VUs + return func(raw lib.ExecutionStep) { + pv := raw.PlannedVUs + for ; cur < pv; cur++ { + _ = rs.vuHandles[cur].start() // TODO: handle the error + } + for ; pv < cur; cur-- { + rs.vuHandles[cur-1].gracefulStop() + } + } } // waiter returns a function that will sleep/wait for the required time since the startTime and then // return. If the context was done before that it will return true otherwise it will return false // TODO use elsewhere -// TODO set startTime here? +// TODO set start here? // TODO move it to a struct type or something and benchmark if that makes a difference -func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool { +func waiter(ctx context.Context, start time.Time) func(offset time.Duration) bool { timer := time.NewTimer(time.Hour * 24) return func(offset time.Duration) bool { - offsetDiff := offset - time.Since(startTime) - if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum - timer.Reset(offsetDiff) + diff := offset - time.Since(start) + if diff > 0 { // wait until time of event arrives // TODO have a mininum + timer.Reset(diff) select { case <-ctx.Done(): return true // exit if context is cancelled diff --git a/lib/executor/ramping_vus_test.go b/lib/executor/ramping_vus_test.go index 5ed49ebf3eb..f129b1c448c 100644 --- a/lib/executor/ramping_vus_test.go +++ b/lib/executor/ramping_vus_test.go @@ -280,6 +280,85 @@ func TestRampingVUsGracefulRampDown(t *testing.T) { } } +// This test aims to check whether the ramping VU executor interrupts +// hanging/remaining VUs after the graceful rampdown period finishes. +// +// Rampdown Graceful Rampdown +// Stage (40ms) (+30ms) +// [ ][ ] +// t 0---5---10---20---30---40---50---60---70 +// VU1 *..................................✔ (40+30=70ms) +// VU2 *...................X (20+30=50ms) +// +// ✔=Finishes,X=Interrupted,.=Sleeps +func TestRampingVUsHandleRemainingVUs(t *testing.T) { + t.Parallel() + + const ( + maxVus = 2 + vuSleepDuration = 65 * time.Millisecond // Each VU will sleep 65ms + wantVuFinished uint32 = 1 // one VU should finish an iteration + wantVuInterrupted uint32 = 1 // one VU should be interrupted + ) + + cfg := RampingVUsConfig{ + BaseConfig: BaseConfig{ + // Extend the total test duration 50ms more + // + // test duration = sum(stages) + GracefulStop + // + // This could have been 30ms but increased it to 50ms + // to prevent the test to become flaky. + GracefulStop: types.NullDurationFrom(50 * time.Millisecond), + }, + // Wait 30ms more for already started iterations + // (Happens in the 2nd stage below: Graceful rampdown period) + GracefulRampDown: types.NullDurationFrom(30 * time.Millisecond), + // Total test duration is 50ms (excluding the GracefulRampdown period) + Stages: []Stage{ + // Activate 2 VUs in 10ms + { + Duration: types.NullDurationFrom(10 * time.Millisecond), + Target: null.IntFrom(int64(maxVus)), + }, + // Rampdown to 0 VUs in 40ms + { + Duration: types.NullDurationFrom(40 * time.Millisecond), + Target: null.IntFrom(int64(0)), + }, + }, + } + + var ( + gotVuInterrupted uint32 + gotVuFinished uint32 + ) + iteration := func(ctx context.Context) error { + select { + case <-time.After(vuSleepDuration): + atomic.AddUint32(&gotVuFinished, 1) + case <-ctx.Done(): + atomic.AddUint32(&gotVuInterrupted, 1) + } + return nil + } + + // run the executor: this should finish in ~70ms + // sum(stages) + GracefulRampDown + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + ctx, cancel, executor, _ := setupExecutor( + t, cfg, + lib.NewExecutionState(lib.Options{}, et, maxVus, maxVus), + simpleRunner(iteration), + ) + defer cancel() + require.NoError(t, executor.Run(ctx, nil, nil)) + + assert.Equal(t, wantVuInterrupted, atomic.LoadUint32(&gotVuInterrupted)) + assert.Equal(t, wantVuFinished, atomic.LoadUint32(&gotVuFinished)) +} + // Ensure there's no wobble of VUs during graceful ramp-down, without segments. // See https://github.com/k6io/k6/issues/1296 func TestRampingVUsRampDownNoWobble(t *testing.T) {