Skip to content

Commit

Permalink
Ensure all VUs are gracefully aborted when an init error occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Dec 5, 2022
1 parent 0625a43 commit 11564b3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
7 changes: 0 additions & 7 deletions cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,14 +1069,7 @@ func TestAbortedByScriptInitError(t *testing.T) {
)
newRootCommand(ts.globalState).execute()

// FIXME: remove this locking after VU initialization accepts a context and
// is properly synchronized: currently when a test is aborted during the
// init phase, some logs might be emitted after the above command returns...
// see: https://github.com/grafana/k6/issues/2790
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

t.Log(stdOut)
assert.Contains(t, stdOut, `level=error msg="Error: oops in 2\n\tat file:///`)
assert.Contains(t, stdOut, `hint="error while initializing VU #2 (script exception)"`)
Expand Down
41 changes: 25 additions & 16 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,12 @@ func (e *ExecutionScheduler) initVUsConcurrently(
ctx context.Context, samplesOut chan<- metrics.SampleContainer, count uint64,
concurrency int, logger logrus.FieldLogger,
) chan error {
doneInits := make(chan error, count) // poor man's early-return waitgroup
doneInits := make(chan error, count) // poor man's waitgroup with results
limiter := make(chan struct{})

for i := 0; i < concurrency; i++ {
go func() {
for range limiter {
// TODO: actually pass the context when we initialize VUs here,
// so we can cancel that initialization if there is an error,
// see https://github.com/grafana/k6/issues/2790
newVU, err := e.initVU(ctx, samplesOut, logger)
if err == nil {
e.state.AddInitializedVU(newVU)
Expand All @@ -197,6 +194,10 @@ func (e *ExecutionScheduler) initVUsConcurrently(
select {
case limiter <- struct{}{}:
case <-ctx.Done():
for skipVu := vuNum; skipVu < count; skipVu++ {
// do not even start initializing the remaining VUs
doneInits <- ctx.Err()
}
return
}
}
Expand Down Expand Up @@ -290,23 +291,31 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics
}),
)

// TODO: once VU initialization accepts a context, when a VU init fails,
// cancel the context and actually wait for all VUs to finish before this
// function returns - that way we won't have any trailing logs, see
// https://github.com/grafana/k6/issues/2790
var initErr error
for vuNum := uint64(0); vuNum < vusToInitialize; vuNum++ {
var err error
select {
case err := <-doneInits:
if err != nil {
logger.WithError(err).Debug("VU initialization returned with an error, aborting...")
// the context's cancel() is called in a defer above and will
// abort any in-flight VU initializations
return err
case err = <-doneInits:
if err == nil {
atomic.AddUint64(initializedVUs, 1)
}
atomic.AddUint64(initializedVUs, 1)
case <-ctx.Done():
return ctx.Err()
err = ctx.Err()
}

if err == nil || initErr != nil {
// No error or a previous init error was already saved and we are
// just waiting for VUs to finish aborting
continue
}

logger.WithError(err).Debug("VU initialization returned with an error, aborting...")
initErr = err
cancel()
}

if initErr != nil {
return initErr
}

e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) {
Expand Down

0 comments on commit 11564b3

Please sign in to comment.