diff --git a/lib/executor/ramping_vus.go b/lib/executor/ramping_vus.go index 28dc43970a4..fe4e0be4390 100644 --- a/lib/executor/ramping_vus.go +++ b/lib/executor/ramping_vus.go @@ -548,12 +548,12 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps) gracefulStop := maxDuration - regularDuration - activeVUs := &sync.WaitGroup{} - defer activeVUs.Wait() - startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, regularDuration, gracefulStop) defer cancel() + activeVUs := &sync.WaitGroup{} + defer activeVUs.Wait() + // Make sure the log and the progress bar have accurate information vlv.logger.WithFields(logrus.Fields{ "type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs, @@ -588,6 +588,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) } else { activeVUs.Add(1) atomic.AddInt64(activeVUsCount, 1) + vlv.executionState.ModCurrentlyActiveVUsCount(+1) } return initVU, err } @@ -595,6 +596,7 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) vlv.executionState.ReturnVU(initVU, false) atomic.AddInt64(activeVUsCount, -1) activeVUs.Done() + vlv.executionState.ModCurrentlyActiveVUsCount(-1) } vuHandles := make([]*vuHandle, maxVUs) @@ -606,9 +608,6 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) vuHandles[i] = vuHandle } - rawStepEvents := lib.StreamExecutionSteps(ctx, startTime, rawExecutionSteps, true) - gracefulLimitEvents := lib.StreamExecutionSteps(ctx, startTime, gracefulExecutionSteps, false) - // 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs var currentScheduledVUs, currentMaxAllowedVUs uint64 @@ -616,12 +615,10 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) if newScheduledVUs > currentScheduledVUs { for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ { _ = vuHandles[vuNum].start() // TODO handle error - vlv.executionState.ModCurrentlyActiveVUsCount(+1) } } else { for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ { vuHandles[vuNum].gracefulStop() - vlv.executionState.ModCurrentlyActiveVUsCount(-1) } } currentScheduledVUs = newScheduledVUs @@ -636,40 +633,59 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer) currentMaxAllowedVUs = newMaxAllowedVUs } - handleAllRawSteps := func() bool { - for { - select { - case step, ok := <-rawStepEvents: - if !ok { - return true - } - handleNewScheduledVUs(step.PlannedVUs) - case step := <-gracefulLimitEvents: - if step.PlannedVUs > currentMaxAllowedVUs { - // Handle the case where a value is read from the - // gracefulLimitEvents channel before rawStepEvents - handleNewScheduledVUs(step.PlannedVUs) - } - handleNewMaxAllowedVUs(step.PlannedVUs) - case <-ctx.Done(): - return false + wait := waiter(ctx, startTime) + // iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset + // giving rawExecutionSteps precedence. + // we stop iterating once rawExecutionSteps are over as we need to run the remaining + // gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until + // the end of gracefulStop timeouts + i, j := 0, 0 + for i != len(rawExecutionSteps) { + if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset { + if wait(gracefulExecutionSteps[j].TimeOffset) { + return + } + handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs) + j++ + } else { + if wait(rawExecutionSteps[i].TimeOffset) { + return } + handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs) + i++ } } - if handleAllRawSteps() { - // Handle any remaining graceful stops - go func() { - for { - select { - case step := <-gracefulLimitEvents: - handleNewMaxAllowedVUs(step.PlannedVUs) - case <-maxDurationCtx.Done(): - return - } + go func() { // iterate over the remaining gracefulExecutionSteps + for _, step := range gracefulExecutionSteps[j:] { + if wait(step.TimeOffset) { + return } - }() - } + handleNewMaxAllowedVUs(step.PlannedVUs) + } + }() return nil } + +// waiter returns a function that will sleep/wait for the required time since the startTime and then +// return. If the context was done before that it will return true otherwise it will return false +// TODO use elsewhere +// TODO set startTime here? +// TODO move it to a struct type or something and benchmark if that makes a difference +func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool { + timer := time.NewTimer(time.Hour * 24) + return func(offset time.Duration) bool { + offsetDiff := offset - time.Since(startTime) + if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum + timer.Reset(offsetDiff) + select { + case <-ctx.Done(): + return true // exit if context is cancelled + case <-timer.C: + // now we do a step + } + } + return false + } +} diff --git a/lib/executor/ramping_vus_test.go b/lib/executor/ramping_vus_test.go index 91af6c8bc71..bdd2d710f4d 100644 --- a/lib/executor/ramping_vus_test.go +++ b/lib/executor/ramping_vus_test.go @@ -81,7 +81,7 @@ func TestRampingVUsRun(t *testing.T) { sampleTimes := []time.Duration{ 500 * time.Millisecond, 1000 * time.Millisecond, - 800 * time.Millisecond, + 900 * time.Millisecond, } errCh := make(chan error) @@ -99,6 +99,167 @@ func TestRampingVUsRun(t *testing.T) { assert.Equal(t, int64(29), atomic.LoadInt64(&iterCount)) } +func TestRampingVUsGracefulStopWaits(t *testing.T) { + t.Parallel() + + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)}, + StartVUs: null.IntFrom(1), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(1), + }, + }, + } + + var ( + started = make(chan struct{}) // the iteration started + stopped = make(chan struct{}) // the iteration stopped + stop = make(chan struct{}) // the itearation should stop + ) + + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + ctx, cancel, executor, _ := setupExecutor( + t, config, es, + simpleRunner(func(ctx context.Context) error { + close(started) + defer close(stopped) + select { + case <-ctx.Done(): + t.Fatal("The iterations should've ended before the context") + case <-stop: + } + return nil + }), + ) + defer cancel() + errCh := make(chan error) + go func() { errCh <- executor.Run(ctx, nil) }() + + <-started + // 500 milliseconds more then the duration and 500 less then the gracefulStop + time.Sleep(time.Millisecond * 1500) + close(stop) + <-stopped + + require.NoError(t, <-errCh) +} + +func TestRampingVUsGracefulStopStops(t *testing.T) { + t.Parallel() + + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)}, + StartVUs: null.IntFrom(1), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(1), + }, + }, + } + + var ( + started = make(chan struct{}) // the iteration started + stopped = make(chan struct{}) // the iteration stopped + stop = make(chan struct{}) // the itearation should stop + ) + + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + ctx, cancel, executor, _ := setupExecutor( + t, config, es, + simpleRunner(func(ctx context.Context) error { + close(started) + defer close(stopped) + select { + case <-ctx.Done(): + case <-stop: + t.Fatal("The iterations shouldn't have ended before the context") + } + return nil + }), + ) + defer cancel() + errCh := make(chan error) + go func() { errCh <- executor.Run(ctx, nil) }() + + <-started + // 500 milliseconds more then the gracefulStop + duration + time.Sleep(time.Millisecond * 2500) + close(stop) + <-stopped + + require.NoError(t, <-errCh) +} + +func TestRampingVUsGracefulRampDown(t *testing.T) { + t.Parallel() + + config := RampingVUsConfig{ + BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(5 * time.Second)}, + StartVUs: null.IntFrom(2), + GracefulRampDown: types.NullDurationFrom(5 * time.Second), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(2), + }, + { + Duration: types.NullDurationFrom(1 * time.Second), + Target: null.IntFrom(0), + }, + }, + } + + var ( + started = make(chan struct{}) // the iteration started + stopped = make(chan struct{}) // the iteration stopped + stop = make(chan struct{}) // the itearation should stop + ) + + et, err := lib.NewExecutionTuple(nil, nil) + require.NoError(t, err) + es := lib.NewExecutionState(lib.Options{}, et, 10, 50) + ctx, cancel, executor, _ := setupExecutor( + t, config, es, + simpleRunner(func(ctx context.Context) error { + if lib.GetState(ctx).Vu == 1 { // the first VU will wait here to do stuff + close(started) + defer close(stopped) + select { + case <-ctx.Done(): + t.Fatal("The iterations shouldn't have ended before the context") + case <-stop: + } + } else { // all other (1) VUs will just sleep long enough + time.Sleep(2500 * time.Millisecond) + } + return nil + }), + ) + defer cancel() + errCh := make(chan error) + go func() { errCh <- executor.Run(ctx, nil) }() + + <-started + // 500 milliseconds more then the gracefulRampDown + duration + time.Sleep(2500 * time.Millisecond) + close(stop) + <-stopped + + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): // way too much time + t.Fatal("Execution should've ended already") + } +} + // Ensure there's no wobble of VUs during graceful ramp-down, without segments. // See https://github.com/loadimpact/k6/issues/1296 func TestRampingVUsRampDownNoWobble(t *testing.T) { @@ -126,7 +287,7 @@ func TestRampingVUsRampDownNoWobble(t *testing.T) { ctx, cancel, executor, _ := setupExecutor( t, config, es, simpleRunner(func(ctx context.Context) error { - time.Sleep(1 * time.Second) + time.Sleep(500 * time.Millisecond) return nil }), ) @@ -136,7 +297,10 @@ func TestRampingVUsRampDownNoWobble(t *testing.T) { 100 * time.Millisecond, 3000 * time.Millisecond, } - const rampDownSamples = 50 + const rampDownSampleTime = 50 * time.Millisecond + var rampDownSamples = int(time.Duration( + config.Stages[len(config.Stages)-1].Duration.Duration+config.GracefulRampDown.Duration, + ) / rampDownSampleTime) errCh := make(chan error) go func() { errCh <- executor.Run(ctx, nil) }() @@ -149,7 +313,7 @@ func TestRampingVUsRampDownNoWobble(t *testing.T) { // Sample ramp-down at a higher rate for i := len(sampleTimes); i < rampDownSamples; i++ { - time.Sleep(50 * time.Millisecond) + time.Sleep(rampDownSampleTime) result[i] = es.GetCurrentlyActiveVUsCount() } diff --git a/lib/helpers.go b/lib/helpers.go index cedcbe0fa7e..8d2e133528a 100644 --- a/lib/helpers.go +++ b/lib/helpers.go @@ -22,7 +22,6 @@ package lib import ( "bytes" - "context" "encoding/json" "fmt" "strings" @@ -113,47 +112,3 @@ func ConcatErrors(errors []error, separator string) string { } return strings.Join(errStrings, separator) } - -// StreamExecutionSteps launches a new goroutine and emits all execution steps -// at their appropriate time offsets over the returned unbuffered channel. If -// closeChanWhenDone is specified, it will close the channel after it sends the -// last step. If it isn't, or if the context is cancelled, the internal -// goroutine will be stopped, *but the channel will remain open*! -// -// As usual, steps in the supplied slice have to be sorted by their TimeOffset -// values in an ascending order. Of course, multiple events can have the same -// time offset (incl. 0). -func StreamExecutionSteps( - ctx context.Context, startTime time.Time, steps []ExecutionStep, closeChanWhenDone bool, -) <-chan ExecutionStep { - ch := make(chan ExecutionStep) - go func() { - for _, step := range steps { - offsetDiff := step.TimeOffset - time.Since(startTime) - if offsetDiff > 0 { // wait until time of event arrives - select { - case <-ctx.Done(): - return // exit if context is cancelled - case <-time.After(offsetDiff): //TODO: reuse a timer? - // do nothing - } - } - select { - case <-ctx.Done(): - // exit if context is cancelled - return - case ch <- step: - // ... otherwise, just send the step - the out channel is - // unbuffered, so we don't need to worry whether the other side - // will keep reading after the context is done. - } - } - - // Close the channel only if all steps were sent successfully (i.e. the - // parent context didn't die) and we were instructed to do so. - if closeChanWhenDone { - close(ch) - } - }() - return ch -}