From cf3e5ce2d7cee8830c94eaac1ad7c275b088bd59 Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Mon, 4 Oct 2021 14:54:00 +0300 Subject: [PATCH 01/10] Refactor ramping VUs executor RampingVUs.Run method was complex and this refactors it by adding rampingVUsRunState to share run state between functions used by the Run method. + Makes the Run method easier to read and understand + Makes it explicit which Goroutines are being fired in the Run + Separates responsibilities to other parts like: + rampingVUsRunState and its methods. + Moves trackProgress Goroutine from the makeProgressFn to the Run method + Makes initializeVUs to only handle initializing GRs + Uses two strategy functions to make them reusable and clearer. handleVUs and handleRemainingVUs use them. + Makes the handleVUs algorithm easier to understand and manage. --- lib/executor/ramping_vus.go | 271 ++++++++++++++++++++---------------- 1 file changed, 151 insertions(+), 120 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 4bd73272558..028c7691713 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -493,154 +493,185 @@ var _ lib.Executor = &RampingVUs{} // Run constantly loops through as many iterations as possible on a variable // number of VUs for the specified stages. -// -// TODO: split up? since this does a ton of things, unfortunately I can't think -// of a less complex way to implement it (besides the old "increment by 100ms -// and see what happens)... :/ so maybe see how it can be split? -// nolint:funlen,gocognit,cyclop -func (vlv RampingVUs) Run( - parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics, -) (err error) { - rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) - regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps) - if !isFinal { - return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration) +func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { + rawSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) + regDur, finalRaw := lib.GetEndOffset(rawSteps) + if !finalRaw { + return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur) } - - gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) - maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps) - if !isFinal { - return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) + gracefulSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) + maxDur, finalGraceful := lib.GetEndOffset(gracefulSteps) + if !finalGraceful { + return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur) } - maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) - gracefulStop := maxDuration - regularDuration - - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop) + startMaxVUs := lib.GetMaxPlannedVUs(gracefulSteps) + startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur) defer cancel() - activeVUs := &sync.WaitGroup{} - defer activeVUs.Wait() - - // Make sure the log and the progress bar have accurate information vlv.logger.WithFields(logrus.Fields{ - "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs, - "duration": regularDuration, "numStages": len(vlv.config.Stages), - }, - ).Debug("Starting executor run...") - - activeVUsCount := new(int64) - vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs)) - regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration) - progressFn := func() (float64, []string) { - spent := time.Since(startTime) - currentlyActiveVUs := atomic.LoadInt64(activeVUsCount) - vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - if spent > regularDuration { - return 1, []string{vus, regularDuration.String()} - } - progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs) - progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr - return float64(spent) / float64(regularDuration), []string{progVUs, progDur} + "type": vlv.config.GetType(), + "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), + "maxVUs": startMaxVUs, + "duration": regDur, + "numStages": len(vlv.config.Stages), + }).Debug("Starting executor run...") + + runState := &rampingVUsRunState{ + executor: vlv, + wg: new(sync.WaitGroup), + vuHandles: make([]*vuHandle, startMaxVUs), + maxVUs: startMaxVUs, + activeVUsCount: new(int64), + started: startTime, + rawSteps: rawSteps, + gracefulSteps: gracefulSteps, + runIteration: getIterationRunner(vlv.executionState, vlv.logger), } - vlv.progress.Modify(pb.WithProgress(progressFn)) - go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn) - // Actually schedule the VUs and iterations, likely the most complicated - // executor among all of them... - runIteration := getIterationRunner(vlv.executionState, vlv.logger) - getVU := func() (lib.InitializedVU, error) { - initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false) - if err != nil { - vlv.logger.WithError(err).Error("Cannot get a VU from the buffer") - cancel() - } else { - activeVUs.Add(1) - atomic.AddInt64(activeVUsCount, 1) - vlv.executionState.ModCurrentlyActiveVUsCount(+1) - } - return initVU, err - } - returnVU := func(initVU lib.InitializedVU) { - vlv.executionState.ReturnVU(initVU, false) - atomic.AddInt64(activeVUsCount, -1) - activeVUs.Done() - vlv.executionState.ModCurrentlyActiveVUsCount(-1) - } - - maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ + progressFn := runState.makeProgressFn(regDur) + maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{ Name: vlv.config.Name, Executor: vlv.config.Type, - StartTime: startTime, + StartTime: runState.started, ProgressFn: progressFn, }) + vlv.progress.Modify(pb.WithProgress(progressFn)) + go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn) - vuHandles := make([]*vuHandle, maxVUs) - for i := uint64(0); i < maxVUs; i++ { - vuHandle := newStoppedVUHandle( - maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters, - &vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i)) - go vuHandle.runLoopsIfPossible(runIteration) - vuHandles[i] = vuHandle + defer runState.wg.Wait() + runState.populateVUHandles(maxDurCtx, cancel) + for i := uint64(0); i < runState.maxVUs; i++ { + go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration) } + runState.handleVUs(ctx) + go runState.handleRemainingVUs(ctx) - // 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs - var currentScheduledVUs, currentMaxAllowedVUs uint64 + return nil +} - handleNewScheduledVUs := func(newScheduledVUs uint64) { - if newScheduledVUs > currentScheduledVUs { - for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ { - _ = vuHandles[vuNum].start() // TODO handle error - } - } else { - for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ { - vuHandles[vuNum].gracefulStop() - } +// rampingVUsRunState is created and initialized by the Run() method +// of the ramping VUs executor. It is used to track and modify various +// details of the execution. +type rampingVUsRunState struct { + executor RampingVUs + vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs + maxVUs uint64 // the scaled number of initially configured MaxVUs + activeVUsCount *int64 // the current number of active VUs, used only for the progress display + started time.Time + rawSteps, gracefulSteps []lib.ExecutionStep + wg *sync.WaitGroup + + runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration +} + +func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) { + vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs)) + regDuration := pb.GetFixedLengthDuration(total, total) + + return func() (float64, []string) { + spent := time.Since(rs.started) + cur := atomic.LoadInt64(rs.activeVUsCount) + progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs) + if spent > total { + return 1, []string{progVUs, total.String()} } - currentScheduledVUs = newScheduledVUs + progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration + return float64(spent) / float64(total), []string{progVUs, progDur} } +} - handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) { - if newMaxAllowedVUs < currentMaxAllowedVUs { - for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ { - vuHandles[vuNum].hardStop() - } +func (rs rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func()) { + getVU := func() (lib.InitializedVU, error) { + pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false) + if err != nil { + rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer") + cancel() + return pvu, err } - currentMaxAllowedVUs = newMaxAllowedVUs + rs.wg.Add(1) + atomic.AddInt64(rs.activeVUsCount, 1) + rs.executor.executionState.ModCurrentlyActiveVUsCount(+1) + return pvu, err + } + returnVU := func(initVU lib.InitializedVU) { + rs.executor.executionState.ReturnVU(initVU, false) + atomic.AddInt64(rs.activeVUsCount, -1) + rs.wg.Done() + rs.executor.executionState.ModCurrentlyActiveVUsCount(-1) } + for i := uint64(0); i < rs.maxVUs; i++ { + rs.vuHandles[i] = newStoppedVUHandle( + ctx, getVU, returnVU, rs.executor.nextIterationCounters, + &rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i)) + } +} - wait := waiter(parentCtx, startTime) - // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset - // giving rawExecutionSteps precedence. - // we stop iterating once rawExecutionSteps are over as we need to run the remaining - // gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until - // the end of gracefulStop timeouts - i, j := 0, 0 - for i != len(rawExecutionSteps) { - if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset { - if wait(gracefulExecutionSteps[j].TimeOffset) { +func (rs rampingVUsRunState) handleVUs(ctx context.Context) { + // iterate over rawSteps and gracefulSteps in order by TimeOffset + // giving rawSteps precedence. + // we stop iterating once rawSteps are over as we need to run the remaining + // gracefulSteps concurrently while waiting for VUs to stop in order to not wait until + // the end of gracefulStop (= maxDur-regDur) timeouts + var ( + handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() + handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy() + wait = waiter(ctx, rs.started) + ) + for i, j := 0, 0; i != len(rs.rawSteps); { + r, g := rs.rawSteps[i], rs.gracefulSteps[j] + if g.TimeOffset < r.TimeOffset { + j++ + if wait(g.TimeOffset) { return } - handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs) - j++ + handleNewMaxAllowedVUs(g) } else { - if wait(rawExecutionSteps[i].TimeOffset) { + i++ + if wait(r.TimeOffset) { return } - handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs) - i++ + handleNewScheduledVUs(r) } } +} - go func() { // iterate over the remaining gracefulExecutionSteps - for _, step := range gracefulExecutionSteps[j:] { - if wait(step.TimeOffset) { - return - } - handleNewMaxAllowedVUs(step.PlannedVUs) +// TODO: removing this has no effect on tests? +func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) { + var ( + handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() + wait = waiter(ctx, rs.started) + ) + for _, s := range rs.gracefulSteps { + if wait(s.TimeOffset) { + return } - }() + handleNewMaxAllowedVUs(s) + } +} - return nil +func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { + var cur uint64 + return func(graceful lib.ExecutionStep) { + pv := graceful.PlannedVUs + for ; pv < cur; cur-- { + rs.vuHandles[cur-1].hardStop() + } + cur = pv + } +} + +func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { + var cur uint64 + return func(raw lib.ExecutionStep) { + pv := raw.PlannedVUs + for ; cur < pv; cur++ { + _ = rs.vuHandles[cur].start() // TODO: handle the error + } + for ; pv < cur; cur-- { + rs.vuHandles[cur-1].gracefulStop() + } + cur = pv + } } // waiter returns a function that will sleep/wait for the required time since the startTime and then @@ -651,9 +682,9 @@ func (vlv RampingVUs) Run( func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool { timer := time.NewTimer(time.Hour * 24) return func(offset time.Duration) bool { - offsetDiff := offset - time.Since(startTime) - if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum - timer.Reset(offsetDiff) + diff := offset - time.Since(startTime) + if diff > 0 { // wait until time of event arrives // TODO have a mininum + timer.Reset(diff) select { case <-ctx.Done(): return true // exit if context is cancelled From 4d7a286f2d847dc86c04f3039df479ebf3c968f8 Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Sun, 24 Oct 2021 18:11:11 +0300 Subject: [PATCH 02/10] Add a test for handleRemainingVUs This test aims to check whether the ramping VU executor interrupts hanging/remaining VUs after the graceful rampdown period finishes. handleRemainingVUs method should be interrupting the VUs that are beyond their allowed time budgets. See the comments in the test for more information. --- lib/executor/ramping_vus.go | 1 - lib/executor/ramping_vus_test.go | 79 ++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 028c7691713..0909af9d22c 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -635,7 +635,6 @@ func (rs rampingVUsRunState) handleVUs(ctx context.Context) { } } -// TODO: removing this has no effect on tests? func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) { var ( handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() diff --git a/lib/executor/ramping_vus_test.go b/lib/executor/ramping_vus_test.go index 5ed49ebf3eb..f129b1c448c 100644 --- a/lib/executor/ramping_vus_test.go +++ b/lib/executor/ramping_vus_test.go @@ -280,6 +280,85 @@ func TestRampingVUsGracefulRampDown(t *testing.T) { } } +// This test aims to check whether the ramping VU executor interrupts +// hanging/remaining VUs after the graceful rampdown period finishes. +// +// Rampdown Graceful Rampdown +// Stage (40ms) (+30ms) +// [ ][ ] +// t 0---5---10---20---30---40---50---60---70 +// VU1 *..................................✔ (40+30=70ms) +// VU2 *...................X (20+30=50ms) +// +// ✔=Finishes,X=Interrupted,.=Sleeps +func TestRampingVUsHandleRemainingVUs(t *testing.T) { + t.Parallel() + + const ( + maxVus = 2 + vuSleepDuration = 65 * time.Millisecond // Each VU will sleep 65ms + wantVuFinished uint32 = 1 // one VU should finish an iteration + wantVuInterrupted uint32 = 1 // one VU should be interrupted + ) + + cfg := RampingVUsConfig{ + BaseConfig: BaseConfig{ + // Extend the total test duration 50ms more + // + // test duration = sum(stages) + GracefulStop + // + // This could have been 30ms but increased it to 50ms + // to prevent the test to become flaky. + GracefulStop: types.NullDurationFrom(50 * time.Millisecond), + }, + // Wait 30ms more for already started iterations + // (Happens in the 2nd stage below: Graceful rampdown period) + GracefulRampDown: types.NullDurationFrom(30 * time.Millisecond), + // Total test duration is 50ms (excluding the GracefulRampdown period) + Stages: []Stage{ + // Activate 2 VUs in 10ms + { + Duration: types.NullDurationFrom(10 * time.Millisecond), + Target: null.IntFrom(int64(maxVus)), + }, + // Rampdown to 0 VUs in 40ms + { + Duration: types.NullDurationFrom(40 * time.Millisecond), + Target: null.IntFrom(int64(0)), + }, + }, + } + + var ( + gotVuInterrupted uint32 + gotVuFinished uint32 + ) + iteration := func(ctx context.Context) error { + select { + case <-time.After(vuSleepDuration): + atomic.AddUint32(&gotVuFinished, 1) + case <-ctx.Done(): + atomic.AddUint32(&gotVuInterrupted, 1) + } + return nil + } + + // run the executor: this should finish in ~70ms + // sum(stages) + GracefulRampDown + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + ctx, cancel, executor, _ := setupExecutor( + t, cfg, + lib.NewExecutionState(lib.Options{}, et, maxVus, maxVus), + simpleRunner(iteration), + ) + defer cancel() + require.NoError(t, executor.Run(ctx, nil, nil)) + + assert.Equal(t, wantVuInterrupted, atomic.LoadUint32(&gotVuInterrupted)) + assert.Equal(t, wantVuFinished, atomic.LoadUint32(&gotVuFinished)) +} + // Ensure there's no wobble of VUs during graceful ramp-down, without segments. // See https://github.com/k6io/k6/issues/1296 func TestRampingVUsRampDownNoWobble(t *testing.T) { From 97ca7c3c4a82b6e1e8093cbb257f9fd95a9603f9 Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Sat, 23 Oct 2021 17:04:45 +0300 Subject: [PATCH 03/10] Rename variable names for rampingVUs executor This commit uses longer variables names to help understand code. --- lib/executor/ramping_vus.go | 52 ++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 0909af9d22c..a9722966055 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -495,32 +495,32 @@ var _ lib.Executor = &RampingVUs{} // number of VUs for the specified stages. func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { rawSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) - regDur, finalRaw := lib.GetEndOffset(rawSteps) - if !finalRaw { - return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur) + regularDuration, isFinal := lib.GetEndOffset(rawSteps) + if !isFinal { + return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration) } gracefulSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) - maxDur, finalGraceful := lib.GetEndOffset(gracefulSteps) - if !finalGraceful { - return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur) + maxDuration, isFinal := lib.GetEndOffset(gracefulSteps) + if !isFinal { + return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) } - startMaxVUs := lib.GetMaxPlannedVUs(gracefulSteps) - startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur) + maxVUs := lib.GetMaxPlannedVUs(gracefulSteps) + startTime, maxDurationCtx, regularDurationCtx, cancel := getDurationContexts(ctx, regularDuration, maxDuration-regularDuration) defer cancel() vlv.logger.WithFields(logrus.Fields{ "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), - "maxVUs": startMaxVUs, - "duration": regDur, + "maxVUs": maxVUs, + "duration": regularDuration, "numStages": len(vlv.config.Stages), }).Debug("Starting executor run...") runState := &rampingVUsRunState{ executor: vlv, wg: new(sync.WaitGroup), - vuHandles: make([]*vuHandle, startMaxVUs), - maxVUs: startMaxVUs, + vuHandles: make([]*vuHandle, maxVUs), + maxVUs: maxVUs, activeVUsCount: new(int64), started: startTime, rawSteps: rawSteps, @@ -528,18 +528,18 @@ func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ runIteration: getIterationRunner(vlv.executionState, vlv.logger), } - progressFn := runState.makeProgressFn(regDur) - maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{ + progressFn := runState.makeProgressFn(regularDuration) + maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{ Name: vlv.config.Name, Executor: vlv.config.Type, StartTime: runState.started, ProgressFn: progressFn, }) vlv.progress.Modify(pb.WithProgress(progressFn)) - go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn) + go trackProgress(ctx, maxDurationCtx, regularDurationCtx, vlv, progressFn) defer runState.wg.Wait() - runState.populateVUHandles(maxDurCtx, cancel) + runState.populateVUHandles(maxDurationCtx, cancel) for i := uint64(0); i < runState.maxVUs; i++ { go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration) } @@ -564,19 +564,19 @@ type rampingVUsRunState struct { runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration } -func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) { +func (rs rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn func() (float64, []string)) { vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs)) - regDuration := pb.GetFixedLengthDuration(total, total) + regularDuration := pb.GetFixedLengthDuration(regular, regular) return func() (float64, []string) { spent := time.Since(rs.started) cur := atomic.LoadInt64(rs.activeVUsCount) progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs) - if spent > total { - return 1, []string{progVUs, total.String()} + if spent > regular { + return 1, []string{progVUs, regular.String()} } - progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration - return float64(spent) / float64(total), []string{progVUs, progDur} + status := pb.GetFixedLengthDuration(spent, regular) + "/" + regularDuration + return float64(spent) / float64(regular), []string{progVUs, status} } } @@ -611,7 +611,7 @@ func (rs rampingVUsRunState) handleVUs(ctx context.Context) { // giving rawSteps precedence. // we stop iterating once rawSteps are over as we need to run the remaining // gracefulSteps concurrently while waiting for VUs to stop in order to not wait until - // the end of gracefulStop (= maxDur-regDur) timeouts + // the end of gracefulStop (= maxDuration-regularDuration) timeouts var ( handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy() @@ -676,12 +676,12 @@ func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionSte // waiter returns a function that will sleep/wait for the required time since the startTime and then // return. If the context was done before that it will return true otherwise it will return false // TODO use elsewhere -// TODO set startTime here? +// TODO set start here? // TODO move it to a struct type or something and benchmark if that makes a difference -func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool { +func waiter(ctx context.Context, start time.Time) func(offset time.Duration) bool { timer := time.NewTimer(time.Hour * 24) return func(offset time.Duration) bool { - diff := offset - time.Since(startTime) + diff := offset - time.Since(start) if diff > 0 { // wait until time of event arrives // TODO have a mininum timer.Reset(diff) select { From bdcdcb19d11813fe30aca8969bdcfe31a4ad544c Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Sat, 23 Oct 2021 16:55:24 +0300 Subject: [PATCH 04/10] Fix remaining gracefulSteps bug handleVUs and handleRemainingVUs was not sharing the current number of planned VUs in graceful steps. This may cause the latter method to act strangely. It's because handleRemainingVUs needs to know the number of remaining graceful steps so that it can stop the remaining VUs accordinly. Now the handleVUs method returns the number of handled graceful steps. So the handleRemainingVUs method can use this information and hard stop the remaining VUs. --- lib/executor/ramping_vus.go | 67 ++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index a9722966055..4cc335a2001 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -543,9 +543,21 @@ func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ for i := uint64(0); i < runState.maxVUs; i++ { go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration) } - runState.handleVUs(ctx) - go runState.handleRemainingVUs(ctx) + var ( + handleNewMaxAllowedVUs = runState.maxAllowedVUsHandlerStrategy() + handleNewScheduledVUs = runState.scheduledVUsHandlerStrategy() + ) + handledGracefulSteps := runState.handleVUs( + ctx, + handleNewMaxAllowedVUs, + handleNewScheduledVUs, + ) + go runState.handleRemainingVUs( + ctx, + handleNewMaxAllowedVUs, + handledGracefulSteps, + ) return nil } @@ -606,41 +618,48 @@ func (rs rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func( } } -func (rs rampingVUsRunState) handleVUs(ctx context.Context) { - // iterate over rawSteps and gracefulSteps in order by TimeOffset - // giving rawSteps precedence. - // we stop iterating once rawSteps are over as we need to run the remaining - // gracefulSteps concurrently while waiting for VUs to stop in order to not wait until - // the end of gracefulStop (= maxDuration-regularDuration) timeouts - var ( - handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() - handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy() - wait = waiter(ctx, rs.started) - ) - for i, j := 0, 0; i != len(rs.rawSteps); { +// handleVUs iterates over rawSteps and gracefulSteps in order according to +// their TimeOffsets, prioritizing rawSteps. It stops iterating once rawSteps +// are over. And it returns the number of handled gracefulSteps. +func (rs rampingVUsRunState) handleVUs( + ctx context.Context, + handleNewMaxAllowedVUs, handleNewScheduledVUs func(lib.ExecutionStep), +) (handledGracefulSteps int) { + wait := waiter(ctx, rs.started) + i, j := 0, 0 + for i != len(rs.rawSteps) { r, g := rs.rawSteps[i], rs.gracefulSteps[j] if g.TimeOffset < r.TimeOffset { - j++ if wait(g.TimeOffset) { - return + break } handleNewMaxAllowedVUs(g) + j++ } else { - i++ if wait(r.TimeOffset) { - return + break } handleNewScheduledVUs(r) + i++ } } + return j } -func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) { - var ( - handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy() - wait = waiter(ctx, rs.started) - ) - for _, s := range rs.gracefulSteps { +// handleRemainingVUs runs the remaining gracefulSteps concurrently +// before the gracefulStop timeout period stops VUs. +// +// This way we will have run the gracefulSteps at the same time while +// waiting for the VUs to finish. +// +// (gracefulStop = maxDuration-regularDuration) +func (rs rampingVUsRunState) handleRemainingVUs( + ctx context.Context, + handleNewMaxAllowedVUs func(lib.ExecutionStep), + handledGracefulSteps int, +) { + wait := waiter(ctx, rs.started) + for _, s := range rs.gracefulSteps[handledGracefulSteps:] { if wait(s.TimeOffset) { return } From b68fd302695ef0b4886f4ed86efc74770d947483 Mon Sep 17 00:00:00 2001 From: Inanc Gumus Date: Fri, 22 Oct 2021 19:00:28 +0300 Subject: [PATCH 05/10] Refactor: Move steps calculations to Init --- lib/executor/ramping_vus.go | 52 +++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 4cc335a2001..506481a549a 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -469,7 +469,7 @@ func (vlvc RampingVUsConfig) GetExecutionRequirements(et *lib.ExecutionTuple) [] // NewExecutor creates a new RampingVUs executor func (vlvc RampingVUsConfig) NewExecutor(es *lib.ExecutionState, logger *logrus.Entry) (lib.Executor, error) { - return RampingVUs{ + return &RampingVUs{ BaseExecutor: NewBaseExecutor(vlvc, es, logger), config: vlvc, }, nil @@ -486,28 +486,43 @@ func (vlvc RampingVUsConfig) HasWork(et *lib.ExecutionTuple) bool { type RampingVUs struct { *BaseExecutor config RampingVUsConfig + + rawSteps, gracefulSteps []lib.ExecutionStep } // Make sure we implement the lib.Executor interface. var _ lib.Executor = &RampingVUs{} +// Init initializes the rampingVUs executor by precalculating the raw +// and graceful steps. +func (vlv *RampingVUs) Init(_ context.Context) error { + vlv.rawSteps = vlv.config.getRawExecutionSteps( + vlv.executionState.ExecutionTuple, true, + ) + vlv.gracefulSteps = vlv.config.GetExecutionRequirements( + vlv.executionState.ExecutionTuple, + ) + return nil +} + // Run constantly loops through as many iterations as possible on a variable // number of VUs for the specified stages. -func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { - rawSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true) - regularDuration, isFinal := lib.GetEndOffset(rawSteps) +func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error { + regularDuration, isFinal := lib.GetEndOffset(vlv.rawSteps) if !isFinal { return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration) } - gracefulSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple) - maxDuration, isFinal := lib.GetEndOffset(gracefulSteps) + maxDuration, isFinal := lib.GetEndOffset(vlv.gracefulSteps) if !isFinal { return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration) } - maxVUs := lib.GetMaxPlannedVUs(gracefulSteps) - startTime, maxDurationCtx, regularDurationCtx, cancel := getDurationContexts(ctx, regularDuration, maxDuration-regularDuration) + startTime, maxDurationCtx, regularDurationCtx, cancel := getDurationContexts( + ctx, regularDuration, maxDuration-regularDuration, + ) defer cancel() + maxVUs := lib.GetMaxPlannedVUs(vlv.gracefulSteps) + vlv.logger.WithFields(logrus.Fields{ "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), @@ -523,8 +538,6 @@ func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ maxVUs: maxVUs, activeVUsCount: new(int64), started: startTime, - rawSteps: rawSteps, - gracefulSteps: gracefulSteps, runIteration: getIterationRunner(vlv.executionState, vlv.logger), } @@ -565,13 +578,12 @@ func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ // of the ramping VUs executor. It is used to track and modify various // details of the execution. type rampingVUsRunState struct { - executor RampingVUs - vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs - maxVUs uint64 // the scaled number of initially configured MaxVUs - activeVUsCount *int64 // the current number of active VUs, used only for the progress display - started time.Time - rawSteps, gracefulSteps []lib.ExecutionStep - wg *sync.WaitGroup + executor *RampingVUs + vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs + maxVUs uint64 // the scaled number of initially configured MaxVUs + activeVUsCount *int64 // the current number of active VUs, used only for the progress display + started time.Time + wg *sync.WaitGroup runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration } @@ -627,8 +639,8 @@ func (rs rampingVUsRunState) handleVUs( ) (handledGracefulSteps int) { wait := waiter(ctx, rs.started) i, j := 0, 0 - for i != len(rs.rawSteps) { - r, g := rs.rawSteps[i], rs.gracefulSteps[j] + for i != len(rs.executor.rawSteps) { + r, g := rs.executor.rawSteps[i], rs.executor.gracefulSteps[j] if g.TimeOffset < r.TimeOffset { if wait(g.TimeOffset) { break @@ -659,7 +671,7 @@ func (rs rampingVUsRunState) handleRemainingVUs( handledGracefulSteps int, ) { wait := waiter(ctx, rs.started) - for _, s := range rs.gracefulSteps[handledGracefulSteps:] { + for _, s := range rs.executor.gracefulSteps[handledGracefulSteps:] { if wait(s.TimeOffset) { return } From 9836174b0bc69741c92659ec69ca955342195929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0nan=C3=A7=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 18 Nov 2021 16:39:09 +0300 Subject: [PATCH 06/10] Add minor comments to Ramping VU executor --- lib/executor/ramping_vus.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 506481a549a..da5a12cf0dc 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -680,7 +680,7 @@ func (rs rampingVUsRunState) handleRemainingVUs( } func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { - var cur uint64 + var cur uint64 // current number of planned graceful VUs return func(graceful lib.ExecutionStep) { pv := graceful.PlannedVUs for ; pv < cur; cur-- { @@ -691,7 +691,7 @@ func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionSt } func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { - var cur uint64 + var cur uint64 // current number of planned raw VUs return func(raw lib.ExecutionStep) { pv := raw.PlannedVUs for ; cur < pv; cur++ { From a9b039fd91653936d901a9dc9c093438f41005c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0nan=C3=A7=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 18 Nov 2021 16:39:29 +0300 Subject: [PATCH 07/10] Remove unnecessary assignment in Ramping VU --- lib/executor/ramping_vus.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index da5a12cf0dc..2e9b81dc5f5 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -700,7 +700,6 @@ func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionSte for ; pv < cur; cur-- { rs.vuHandles[cur-1].gracefulStop() } - cur = pv } } From 1eebf1785deb7c0b4b3174538adb1fde92702670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0nan=C3=A7=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 18 Nov 2021 18:13:40 +0300 Subject: [PATCH 08/10] Method renamings in Ramping VU executor + Also changes the value receiver type to a pointer receiver for rampingVUsRunState. --- lib/executor/ramping_vus.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 2e9b81dc5f5..ad785a2e8e6 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -533,7 +533,7 @@ func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, runState := &rampingVUsRunState{ executor: vlv, - wg: new(sync.WaitGroup), + wg: sync.WaitGroup{}, vuHandles: make([]*vuHandle, maxVUs), maxVUs: maxVUs, activeVUsCount: new(int64), @@ -561,12 +561,12 @@ func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, handleNewMaxAllowedVUs = runState.maxAllowedVUsHandlerStrategy() handleNewScheduledVUs = runState.scheduledVUsHandlerStrategy() ) - handledGracefulSteps := runState.handleVUs( + handledGracefulSteps := runState.iterateSteps( ctx, handleNewMaxAllowedVUs, handleNewScheduledVUs, ) - go runState.handleRemainingVUs( + go runState.runRemainingGracefulSteps( ctx, handleNewMaxAllowedVUs, handledGracefulSteps, @@ -583,12 +583,12 @@ type rampingVUsRunState struct { maxVUs uint64 // the scaled number of initially configured MaxVUs activeVUsCount *int64 // the current number of active VUs, used only for the progress display started time.Time - wg *sync.WaitGroup + wg sync.WaitGroup runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration } -func (rs rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn func() (float64, []string)) { +func (rs *rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn func() (float64, []string)) { vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs)) regularDuration := pb.GetFixedLengthDuration(regular, regular) @@ -604,7 +604,7 @@ func (rs rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn f } } -func (rs rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func()) { +func (rs *rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func()) { getVU := func() (lib.InitializedVU, error) { pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false) if err != nil { @@ -630,10 +630,10 @@ func (rs rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func( } } -// handleVUs iterates over rawSteps and gracefulSteps in order according to +// iterateSteps iterates over rawSteps and gracefulSteps in order according to // their TimeOffsets, prioritizing rawSteps. It stops iterating once rawSteps // are over. And it returns the number of handled gracefulSteps. -func (rs rampingVUsRunState) handleVUs( +func (rs *rampingVUsRunState) iterateSteps( ctx context.Context, handleNewMaxAllowedVUs, handleNewScheduledVUs func(lib.ExecutionStep), ) (handledGracefulSteps int) { @@ -658,14 +658,14 @@ func (rs rampingVUsRunState) handleVUs( return j } -// handleRemainingVUs runs the remaining gracefulSteps concurrently +// runRemainingGracefulSteps runs the remaining gracefulSteps concurrently // before the gracefulStop timeout period stops VUs. // // This way we will have run the gracefulSteps at the same time while // waiting for the VUs to finish. // // (gracefulStop = maxDuration-regularDuration) -func (rs rampingVUsRunState) handleRemainingVUs( +func (rs *rampingVUsRunState) runRemainingGracefulSteps( ctx context.Context, handleNewMaxAllowedVUs func(lib.ExecutionStep), handledGracefulSteps int, @@ -679,7 +679,7 @@ func (rs rampingVUsRunState) handleRemainingVUs( } } -func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { +func (rs *rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) { var cur uint64 // current number of planned graceful VUs return func(graceful lib.ExecutionStep) { pv := graceful.PlannedVUs @@ -690,7 +690,7 @@ func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionSt } } -func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { +func (rs *rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) { var cur uint64 // current number of planned raw VUs return func(raw lib.ExecutionStep) { pv := raw.PlannedVUs From c0f7e06dc6d15611ba9c4866beef62a66ab10e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0nan=C3=A7=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 18 Nov 2021 18:16:51 +0300 Subject: [PATCH 09/10] Remove unnecessary field assignment in Ramping VU --- lib/executor/ramping_vus.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index ad785a2e8e6..40bd422fea0 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -533,7 +533,6 @@ func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, runState := &rampingVUsRunState{ executor: vlv, - wg: sync.WaitGroup{}, vuHandles: make([]*vuHandle, maxVUs), maxVUs: maxVUs, activeVUsCount: new(int64), From 4bb561d7a1240b671536cc48abe66b57ed9314c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=B0nan=C3=A7=20G=C3=BCm=C3=BC=C5=9F?= Date: Thu, 18 Nov 2021 18:42:34 +0300 Subject: [PATCH 10/10] Move runLoopsIfPossible goroutine creation to a function --- lib/executor/ramping_vus.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 40bd422fea0..18a07b60306 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -551,10 +551,9 @@ func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, go trackProgress(ctx, maxDurationCtx, regularDurationCtx, vlv, progressFn) defer runState.wg.Wait() - runState.populateVUHandles(maxDurationCtx, cancel) - for i := uint64(0); i < runState.maxVUs; i++ { - go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration) - } + // this will populate stopped VUs and run runLoopsIfPossible on each VU + // handle in a new goroutine + runState.runLoopsIfPossible(maxDurationCtx, cancel) var ( handleNewMaxAllowedVUs = runState.maxAllowedVUsHandlerStrategy() @@ -603,7 +602,7 @@ func (rs *rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn } } -func (rs *rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func()) { +func (rs *rampingVUsRunState) runLoopsIfPossible(ctx context.Context, cancel func()) { getVU := func() (lib.InitializedVU, error) { pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false) if err != nil { @@ -626,6 +625,7 @@ func (rs *rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func rs.vuHandles[i] = newStoppedVUHandle( ctx, getVU, returnVU, rs.executor.nextIterationCounters, &rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i)) + go rs.vuHandles[i].runLoopsIfPossible(rs.runIteration) } }