Skip to content

Commit

Permalink
merge steps in VLV executors for more stability (#1496)
Browse files Browse the repository at this point in the history
Also fix VLV not respecting gracefulStop since 270fd91, add test for it
and try to make some other VLV tests more stable.

The original code (pre 270fd91) did go through all the raw steps
(ramp up/down) and the graceful stop ones concurrently and when the raw
steps ended it will return from Run, but will start a goroutine to go
through the remainign graceful stop ones. The most important of which (
at least in my current understanding) being the last one which is the
actuall gracefulStop end which will make all VUs stop. The reason why
this actually worked is that it also waiting on all activeVUs to have
ended being active, being returned in the new terminology, before it
actually cancels the context for all of them. So if the VUs manage to
end iterations in this time the executor will end earlier if not the
gracefuStop will make them.

270fd91 broke the returning of VUs which lead to the executor never
returning and moving the cancel "before" the waiting for activeVUs fixed
that at the cost of gracefulStop not being taken into account, but with
no tests, nobody noticed.

Here I basically revert that especially because vuHandle now returns VUs
when stopped.

fixes #1473

Co-authored-by: na-- <[email protected]>
  • Loading branch information
mstoykov and na-- authored Jun 10, 2020
1 parent 54a1a45 commit 0486c11
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 86 deletions.
90 changes: 53 additions & 37 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,12 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, regularDuration, gracefulStop)
defer cancel()

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()

// Make sure the log and the progress bar have accurate information
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs,
Expand Down Expand Up @@ -588,13 +588,15 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
} else {
activeVUs.Add(1)
atomic.AddInt64(activeVUsCount, 1)
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
}
return initVU, err
}
returnVU := func(initVU lib.InitializedVU) {
vlv.executionState.ReturnVU(initVU, false)
atomic.AddInt64(activeVUsCount, -1)
activeVUs.Done()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
}

vuHandles := make([]*vuHandle, maxVUs)
Expand All @@ -606,22 +608,17 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
vuHandles[i] = vuHandle
}

rawStepEvents := lib.StreamExecutionSteps(ctx, startTime, rawExecutionSteps, true)
gracefulLimitEvents := lib.StreamExecutionSteps(ctx, startTime, gracefulExecutionSteps, false)

// 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs
var currentScheduledVUs, currentMaxAllowedVUs uint64

handleNewScheduledVUs := func(newScheduledVUs uint64) {
if newScheduledVUs > currentScheduledVUs {
for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ {
_ = vuHandles[vuNum].start() // TODO handle error
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
}
} else {
for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ {
vuHandles[vuNum].gracefulStop()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
}
}
currentScheduledVUs = newScheduledVUs
Expand All @@ -636,40 +633,59 @@ func (vlv RampingVUs) Run(ctx context.Context, out chan<- stats.SampleContainer)
currentMaxAllowedVUs = newMaxAllowedVUs
}

handleAllRawSteps := func() bool {
for {
select {
case step, ok := <-rawStepEvents:
if !ok {
return true
}
handleNewScheduledVUs(step.PlannedVUs)
case step := <-gracefulLimitEvents:
if step.PlannedVUs > currentMaxAllowedVUs {
// Handle the case where a value is read from the
// gracefulLimitEvents channel before rawStepEvents
handleNewScheduledVUs(step.PlannedVUs)
}
handleNewMaxAllowedVUs(step.PlannedVUs)
case <-ctx.Done():
return false
wait := waiter(ctx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
// gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop timeouts
i, j := 0, 0
for i != len(rawExecutionSteps) {
if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset {
if wait(gracefulExecutionSteps[j].TimeOffset) {
return
}
handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs)
j++
} else {
if wait(rawExecutionSteps[i].TimeOffset) {
return
}
handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs)
i++
}
}

if handleAllRawSteps() {
// Handle any remaining graceful stops
go func() {
for {
select {
case step := <-gracefulLimitEvents:
handleNewMaxAllowedVUs(step.PlannedVUs)
case <-maxDurationCtx.Done():
return
}
go func() { // iterate over the remaining gracefulExecutionSteps
for _, step := range gracefulExecutionSteps[j:] {
if wait(step.TimeOffset) {
return
}
}()
}
handleNewMaxAllowedVUs(step.PlannedVUs)
}
}()

return nil
}

// waiter returns a function that will sleep/wait for the required time since the startTime and then
// return. If the context was done before that it will return true otherwise it will return false
// TODO use elsewhere
// TODO set startTime here?
// TODO move it to a struct type or something and benchmark if that makes a difference
func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool {
timer := time.NewTimer(time.Hour * 24)
return func(offset time.Duration) bool {
offsetDiff := offset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(offsetDiff)
select {
case <-ctx.Done():
return true // exit if context is cancelled
case <-timer.C:
// now we do a step
}
}
return false
}
}
172 changes: 168 additions & 4 deletions lib/executor/ramping_vus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestRampingVUsRun(t *testing.T) {
sampleTimes := []time.Duration{
500 * time.Millisecond,
1000 * time.Millisecond,
800 * time.Millisecond,
900 * time.Millisecond,
}

errCh := make(chan error)
Expand All @@ -99,6 +99,167 @@ func TestRampingVUsRun(t *testing.T) {
assert.Equal(t, int64(29), atomic.LoadInt64(&iterCount))
}

func TestRampingVUsGracefulStopWaits(t *testing.T) {
t.Parallel()

config := RampingVUsConfig{
BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)},
StartVUs: null.IntFrom(1),
Stages: []Stage{
{
Duration: types.NullDurationFrom(1 * time.Second),
Target: null.IntFrom(1),
},
},
}

var (
started = make(chan struct{}) // the iteration started
stopped = make(chan struct{}) // the iteration stopped
stop = make(chan struct{}) // the itearation should stop
)

et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 10, 50)
ctx, cancel, executor, _ := setupExecutor(
t, config, es,
simpleRunner(func(ctx context.Context) error {
close(started)
defer close(stopped)
select {
case <-ctx.Done():
t.Fatal("The iterations should've ended before the context")
case <-stop:
}
return nil
}),
)
defer cancel()
errCh := make(chan error)
go func() { errCh <- executor.Run(ctx, nil) }()

<-started
// 500 milliseconds more then the duration and 500 less then the gracefulStop
time.Sleep(time.Millisecond * 1500)
close(stop)
<-stopped

require.NoError(t, <-errCh)
}

func TestRampingVUsGracefulStopStops(t *testing.T) {
t.Parallel()

config := RampingVUsConfig{
BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(time.Second)},
StartVUs: null.IntFrom(1),
Stages: []Stage{
{
Duration: types.NullDurationFrom(1 * time.Second),
Target: null.IntFrom(1),
},
},
}

var (
started = make(chan struct{}) // the iteration started
stopped = make(chan struct{}) // the iteration stopped
stop = make(chan struct{}) // the itearation should stop
)

et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 10, 50)
ctx, cancel, executor, _ := setupExecutor(
t, config, es,
simpleRunner(func(ctx context.Context) error {
close(started)
defer close(stopped)
select {
case <-ctx.Done():
case <-stop:
t.Fatal("The iterations shouldn't have ended before the context")
}
return nil
}),
)
defer cancel()
errCh := make(chan error)
go func() { errCh <- executor.Run(ctx, nil) }()

<-started
// 500 milliseconds more then the gracefulStop + duration
time.Sleep(time.Millisecond * 2500)
close(stop)
<-stopped

require.NoError(t, <-errCh)
}

func TestRampingVUsGracefulRampDown(t *testing.T) {
t.Parallel()

config := RampingVUsConfig{
BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(5 * time.Second)},
StartVUs: null.IntFrom(2),
GracefulRampDown: types.NullDurationFrom(5 * time.Second),
Stages: []Stage{
{
Duration: types.NullDurationFrom(1 * time.Second),
Target: null.IntFrom(2),
},
{
Duration: types.NullDurationFrom(1 * time.Second),
Target: null.IntFrom(0),
},
},
}

var (
started = make(chan struct{}) // the iteration started
stopped = make(chan struct{}) // the iteration stopped
stop = make(chan struct{}) // the itearation should stop
)

et, err := lib.NewExecutionTuple(nil, nil)
require.NoError(t, err)
es := lib.NewExecutionState(lib.Options{}, et, 10, 50)
ctx, cancel, executor, _ := setupExecutor(
t, config, es,
simpleRunner(func(ctx context.Context) error {
if lib.GetState(ctx).Vu == 1 { // the first VU will wait here to do stuff
close(started)
defer close(stopped)
select {
case <-ctx.Done():
t.Fatal("The iterations shouldn't have ended before the context")
case <-stop:
}
} else { // all other (1) VUs will just sleep long enough
time.Sleep(2500 * time.Millisecond)
}
return nil
}),
)
defer cancel()
errCh := make(chan error)
go func() { errCh <- executor.Run(ctx, nil) }()

<-started
// 500 milliseconds more then the gracefulRampDown + duration
time.Sleep(2500 * time.Millisecond)
close(stop)
<-stopped

select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(time.Second): // way too much time
t.Fatal("Execution should've ended already")
}
}

// Ensure there's no wobble of VUs during graceful ramp-down, without segments.
// See https://github.com/loadimpact/k6/issues/1296
func TestRampingVUsRampDownNoWobble(t *testing.T) {
Expand Down Expand Up @@ -126,7 +287,7 @@ func TestRampingVUsRampDownNoWobble(t *testing.T) {
ctx, cancel, executor, _ := setupExecutor(
t, config, es,
simpleRunner(func(ctx context.Context) error {
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
return nil
}),
)
Expand All @@ -136,7 +297,10 @@ func TestRampingVUsRampDownNoWobble(t *testing.T) {
100 * time.Millisecond,
3000 * time.Millisecond,
}
const rampDownSamples = 50
const rampDownSampleTime = 50 * time.Millisecond
var rampDownSamples = int(time.Duration(
config.Stages[len(config.Stages)-1].Duration.Duration+config.GracefulRampDown.Duration,
) / rampDownSampleTime)

errCh := make(chan error)
go func() { errCh <- executor.Run(ctx, nil) }()
Expand All @@ -149,7 +313,7 @@ func TestRampingVUsRampDownNoWobble(t *testing.T) {

// Sample ramp-down at a higher rate
for i := len(sampleTimes); i < rampDownSamples; i++ {
time.Sleep(50 * time.Millisecond)
time.Sleep(rampDownSampleTime)
result[i] = es.GetCurrentlyActiveVUsCount()
}

Expand Down
Loading

0 comments on commit 0486c11

Please sign in to comment.