From 24cbf79986d615ac7ce201ba43877b62f9299b9b Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 1 Dec 2022 17:12:45 +0200 Subject: [PATCH 1/7] Thread a real context when initializing new VUs With some extra work, this will allow us to gracefully interrupt VUs in the middle of their initialization. --- core/local/local.go | 8 +- js/bundle.go | 8 +- js/bundle_test.go | 17 +- js/console_test.go | 20 +-- js/empty_iterations_bench_test.go | 2 +- js/http_bench_test.go | 8 +- js/init_and_modules_test.go | 4 +- js/initcontext.go | 2 +- js/initcontext_test.go | 26 +-- js/module_loading_test.go | 77 ++++----- js/modules/k6/marshalling_test.go | 2 +- js/path_resolution_test.go | 13 +- js/runner.go | 30 ++-- js/runner_test.go | 187 ++++++++++++---------- js/share_test.go | 7 +- lib/executor/common_test.go | 2 +- lib/executor/ramping_arrival_rate_test.go | 8 +- lib/executor/vu_handle_test.go | 8 +- lib/runner.go | 2 +- lib/testutils/minirunner/minirunner.go | 4 +- 20 files changed, 228 insertions(+), 207 deletions(-) diff --git a/core/local/local.go b/core/local/local.go index 8b043fd7292..60312b14aee 100644 --- a/core/local/local.go +++ b/core/local/local.go @@ -135,12 +135,12 @@ func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep { // in the Init() method, and also passed to executors so they can initialize // any unplanned VUs themselves. func (e *ExecutionScheduler) initVU( - samplesOut chan<- metrics.SampleContainer, logger logrus.FieldLogger, + ctx context.Context, samplesOut chan<- metrics.SampleContainer, logger logrus.FieldLogger, ) (lib.InitializedVU, error) { // Get the VU IDs here, so that the VUs are (mostly) ordered by their // number in the channel buffer vuIDLocal, vuIDGlobal := e.state.GetUniqueVUIdentifiers() - vu, err := e.state.Test.Runner.NewVU(vuIDLocal, vuIDGlobal, samplesOut) + vu, err := e.state.Test.Runner.NewVU(ctx, vuIDLocal, vuIDGlobal, samplesOut) if err != nil { return nil, errext.WithHint(err, fmt.Sprintf("error while initializing VU #%d", vuIDGlobal)) } @@ -182,7 +182,7 @@ func (e *ExecutionScheduler) initVUsConcurrently( // TODO: actually pass the context when we initialize VUs here, // so we can cancel that initialization if there is an error, // see https://github.com/grafana/k6/issues/2790 - newVU, err := e.initVU(samplesOut, logger) + newVU, err := e.initVU(ctx, samplesOut, logger) if err == nil { e.state.AddInitializedVU(newVU) } @@ -310,7 +310,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics } e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { - return e.initVU(samplesOut, logger) + return e.initVU(ctx, samplesOut, logger) }) e.state.SetExecutionStatus(lib.ExecutionStatusInitExecutors) diff --git a/js/bundle.go b/js/bundle.go index b13ded9cdfe..ca30f1de861 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -239,10 +239,13 @@ func (b *Bundle) getExports(logger logrus.FieldLogger, rt *goja.Runtime, options } // Instantiate creates a new runtime from this bundle. -func (b *Bundle) Instantiate(logger logrus.FieldLogger, vuID uint64) (*BundleInstance, error) { +func (b *Bundle) Instantiate(ctx context.Context, logger logrus.FieldLogger, vuID uint64) (*BundleInstance, error) { // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. - vuImpl := &moduleVUImpl{runtime: goja.New()} + vuImpl := &moduleVUImpl{ + ctx: ctx, + runtime: goja.New(), + } init := newBoundInitContext(b.BaseInitContext, vuImpl) if err := b.instantiate(logger, vuImpl.runtime, init, vuID); err != nil { return nil, err @@ -326,7 +329,6 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init * Registry: b.registry, } unbindInit := b.setInitGlobals(rt, init) - init.moduleVUImpl.ctx = context.Background() init.moduleVUImpl.initEnv = initenv init.moduleVUImpl.eventLoop = eventloop.New(init.moduleVUImpl) pgm := b.initializeProgramObject(rt, init) diff --git a/js/bundle_test.go b/js/bundle_test.go index 8873478e846..ba86b87458a 100644 --- a/js/bundle_test.go +++ b/js/bundle_test.go @@ -1,6 +1,7 @@ package js import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -476,7 +477,7 @@ func TestNewBundleFromArchive(t *testing.T) { logger := testutils.NewLogger(t) checkBundle := func(t *testing.T, b *Bundle) { require.Equal(t, lib.Options{VUs: null.IntFrom(12345)}, b.Options) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) val, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -569,7 +570,7 @@ func TestNewBundleFromArchive(t *testing.T) { } b, err := NewBundleFromArchive(getTestPreInitState(t, logger, nil), arc) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) val, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -713,7 +714,7 @@ func TestOpen(t *testing.T) { for source, b := range map[string]*Bundle{"source": sourceBundle, "archive": arcBundle} { b := b t.Run(source, func(t *testing.T) { - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) v, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -749,7 +750,7 @@ func TestBundleInstantiate(t *testing.T) { require.NoError(t, err) logger := testutils.NewLogger(t) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) v, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -769,7 +770,7 @@ func TestBundleInstantiate(t *testing.T) { require.NoError(t, err) logger := testutils.NewLogger(t) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) // Ensure `options` properties are correctly marshalled jsOptions := bi.getExported("options").ToObject(bi.Runtime) @@ -781,7 +782,7 @@ func TestBundleInstantiate(t *testing.T) { // Ensure options propagate correctly from outside to the script optOrig := b.Options.VUs b.Options.VUs = null.IntFrom(10) - bi2, err := b.Instantiate(logger, 0) + bi2, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) jsOptions = bi2.getExported("options").ToObject(bi2.Runtime) vus = jsOptions.Get("vus").Export() @@ -817,7 +818,7 @@ func TestBundleEnv(t *testing.T) { require.Equal(t, "1", b.RuntimeOptions.Env["TEST_A"]) require.Equal(t, "", b.RuntimeOptions.Env["TEST_B"]) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -853,7 +854,7 @@ func TestBundleNotSharable(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() for i := 0; i < vus; i++ { - bi, err := b.Instantiate(logger, uint64(i)) + bi, err := b.Instantiate(context.Background(), logger, uint64(i)) require.NoError(t, err) for j := 0; j < iters; j++ { bi.Runtime.Set("__ITER", j) diff --git a/js/console_test.go b/js/console_test.go index 9cbaf0c541e..e4c3c2a0b96 100644 --- a/js/console_test.go +++ b/js/console_test.go @@ -209,12 +209,12 @@ func TestConsoleLog(t *testing.T) { `exports.default = function() { console.log(%s); }`, tt.in)) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.newVU(1, 1, samples) + initVU, err := r.newVU(ctx, 1, 1, samples) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) logger := extractLogger(vu.(*ActiveVU).Console.logger) @@ -266,12 +266,13 @@ func TestConsoleLevels(t *testing.T) { )) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.newVU(1, 1, samples) + initVU, err := r.newVU(ctx, 1, 1, samples) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) logger := extractLogger(vu.(*ActiveVU).Console.logger) @@ -363,12 +364,13 @@ func TestFileConsole(t *testing.T) { }) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.newVU(1, 1, samples) + initVU, err := r.newVU(ctx, 1, 1, samples) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) logger := extractLogger(vu.(*ActiveVU).Console.logger) diff --git a/js/empty_iterations_bench_test.go b/js/empty_iterations_bench_test.go index 02d810a6ea2..91921811f80 100644 --- a/js/empty_iterations_bench_test.go +++ b/js/empty_iterations_bench_test.go @@ -22,7 +22,7 @@ func BenchmarkEmptyIteration(b *testing.B) { for range ch { } }() - initVU, err := r.NewVU(1, 1, ch) + initVU, err := r.NewVU(context.Background(), 1, 1, ch) require.NoError(b, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/js/http_bench_test.go b/js/http_bench_test.go index 02bb237398c..7f70b821d2a 100644 --- a/js/http_bench_test.go +++ b/js/http_bench_test.go @@ -45,10 +45,10 @@ func BenchmarkHTTPRequests(b *testing.B) { for range ch { } }() - initVU, err := r.NewVU(1, 1, ch) - require.NoError(b, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, ch) + require.NoError(b, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) b.StartTimer() for i := 0; i < b.N; i++ { @@ -83,10 +83,10 @@ func BenchmarkHTTPRequestsBase(b *testing.B) { for range ch { } }() - initVU, err := r.NewVU(1, 1, ch) - require.NoError(b, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, ch) + require.NoError(b, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) b.StartTimer() for i := 0; i < b.N; i++ { diff --git a/js/init_and_modules_test.go b/js/init_and_modules_test.go index 49a8e4b3af7..fc21d24fd5b 100644 --- a/js/init_and_modules_test.go +++ b/js/init_and_modules_test.go @@ -74,7 +74,7 @@ func TestNewJSRunnerWithCustomModule(t *testing.T) { assert.Equal(t, checkModule.initCtxCalled, 1) assert.Equal(t, checkModule.vuCtxCalled, 0) - vu, err := runner.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + vu, err := runner.NewVU(context.Background(), 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) assert.Equal(t, checkModule.initCtxCalled, 2) assert.Equal(t, checkModule.vuCtxCalled, 0) @@ -104,7 +104,7 @@ func TestNewJSRunnerWithCustomModule(t *testing.T) { require.NoError(t, err) assert.Equal(t, checkModule.initCtxCalled, 3) // changes because we need to get the exported functions assert.Equal(t, checkModule.vuCtxCalled, 2) - vuFromArc, err := runnerFromArc.NewVU(2, 2, make(chan metrics.SampleContainer, 100)) + vuFromArc, err := runnerFromArc.NewVU(context.Background(), 2, 2, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) assert.Equal(t, checkModule.initCtxCalled, 4) assert.Equal(t, checkModule.vuCtxCalled, 2) diff --git a/js/initcontext.go b/js/initcontext.go index cad1e15a339..88320825fbe 100644 --- a/js/initcontext.go +++ b/js/initcontext.go @@ -70,7 +70,7 @@ func NewInitContext( moduleRegistry: getJSModules(), exportsCache: make(map[string]goja.Value), moduleVUImpl: &moduleVUImpl{ - ctx: context.Background(), + ctx: context.Background(), // TODO: pass a real context here as well? runtime: rt, }, } diff --git a/js/initcontext_test.go b/js/initcontext_test.go index b19991aeef5..b4b05855e11 100644 --- a/js/initcontext_test.go +++ b/js/initcontext_test.go @@ -47,7 +47,7 @@ func TestInitContextRequire(t *testing.T) { `) require.NoError(t, err, "bundle error") - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) assert.NoError(t, err, "instance error") exports := bi.pgm.exports @@ -73,7 +73,7 @@ func TestInitContextRequire(t *testing.T) { `) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) exports := bi.pgm.exports @@ -183,7 +183,7 @@ func TestInitContextRequire(t *testing.T) { assert.Contains(t, b.BaseInitContext.programs, "file://"+constPath) } - _, err = b.Instantiate(logger, 0) + _, err = b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) }) } @@ -207,7 +207,7 @@ func TestInitContextRequire(t *testing.T) { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) assert.NoError(t, err) @@ -236,7 +236,7 @@ func createAndReadFile(t *testing.T, file string, content []byte, expectedLength return nil, err } - bi, err := b.Instantiate(testutils.NewLogger(t), 0) + bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 0) if err != nil { return nil, err } @@ -352,7 +352,7 @@ func TestRequestWithBinaryFile(t *testing.T) { `, srv.URL), fs) require.NoError(t, err) - bi, err := b.Instantiate(testutils.NewLogger(t), 0) + bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) root, err := lib.NewGroup("", nil) @@ -499,7 +499,7 @@ func TestRequestWithMultipleBinaryFiles(t *testing.T) { `, srv.URL), fs) require.NoError(t, err) - bi, err := b.Instantiate(testutils.NewLogger(t), 0) + bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) root, err := lib.NewGroup("", nil) @@ -550,7 +550,7 @@ func TestInitContextVU(t *testing.T) { export default function() { return vu; } `) require.NoError(t, err) - bi, err := b.Instantiate(testutils.NewLogger(t), 5) + bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 5) require.NoError(t, err) v, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -583,7 +583,7 @@ export default function(){ b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -614,7 +614,7 @@ export default function () { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -646,7 +646,7 @@ export default function () { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -677,7 +677,7 @@ export default function () { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(logger, 0) + bi, err := b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -715,6 +715,6 @@ func TestImportModificationsAreConsistentBetweenFiles(t *testing.T) { `, fs) require.NoError(t, err, "bundle error") - _, err = b.Instantiate(logger, 0) + _, err = b.Instantiate(context.Background(), logger, 0) require.NoError(t, err) } diff --git a/js/module_loading_test.go b/js/module_loading_test.go index 42e2d1bc89c..1f8ec02e151 100644 --- a/js/module_loading_test.go +++ b/js/module_loading_test.go @@ -105,14 +105,14 @@ func TestLoadOnceGlobalVars(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - require.NoError(t, err) - err = vu.RunOnce() + + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } }) @@ -163,13 +163,12 @@ func TestLoadExportsIsUsableInModule(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - require.NoError(t, err) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -218,13 +217,12 @@ func TestLoadDoesntBreakHTTPGet(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -271,22 +269,20 @@ func TestLoadGlobalVarsAreNotSharedBetweenVUs(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) // run a second VU - initVU, err = r.NewVU(2, 2, ch) - require.NoError(t, err) ctx, cancel = context.WithCancel(context.Background()) defer cancel() - vu = initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err = r.NewVU(ctx, 2, 2, ch) require.NoError(t, err) + vu = initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -343,13 +339,12 @@ func TestLoadCycle(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -413,13 +408,12 @@ func TestLoadCycleBinding(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -485,13 +479,12 @@ func TestBrowserified(t *testing.T) { t.Parallel() ch := make(chan metrics.SampleContainer, 100) defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -536,13 +529,12 @@ func TestLoadingUnexistingModuleDoesntPanic(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -580,13 +572,12 @@ func TestLoadingSourceMapsDoesntErrorOut(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) - err = vu.RunOnce() + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) + require.NoError(t, vu.RunOnce()) }) } } @@ -640,14 +631,12 @@ func TestOptionsAreGloballyReadable(t *testing.T) { t.Parallel() ch := newDevNullSampleChannel() defer close(ch) - initVU, err := r.NewVU(1, 1, ch) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, ch) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) require.NoError(t, err) - err = vu.RunOnce() - require.NoError(t, err) + require.NoError(t, vu.RunOnce()) }) } } diff --git a/js/modules/k6/marshalling_test.go b/js/modules/k6/marshalling_test.go index 8f3245f0b5f..70326ded19f 100644 --- a/js/modules/k6/marshalling_test.go +++ b/js/modules/k6/marshalling_test.go @@ -122,7 +122,7 @@ func TestSetupDataMarshalling(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() require.NoError(t, runner.Setup(ctx, samples)) - initVU, err := runner.NewVU(1, 1, samples) + initVU, err := runner.NewVU(ctx, 1, 1, samples) require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) assert.NoError(t, vu.RunOnce()) diff --git a/js/path_resolution_test.go b/js/path_resolution_test.go index f9bbe7074f2..0a0ad1aef8a 100644 --- a/js/path_resolution_test.go +++ b/js/path_resolution_test.go @@ -1,6 +1,7 @@ package js import ( + "context" "testing" "github.com/spf13/afero" @@ -26,7 +27,7 @@ func TestOpenPathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/scripts/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) }) @@ -47,7 +48,7 @@ func TestOpenPathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) }) @@ -71,7 +72,7 @@ func TestOpenPathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) }) } @@ -92,7 +93,7 @@ func TestRequirePathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/scripts/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) }) @@ -113,7 +114,7 @@ func TestRequirePathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) }) @@ -137,7 +138,7 @@ func TestRequirePathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) require.NoError(t, err) }) } diff --git a/js/runner.go b/js/runner.go index 72b1ccdd9b9..4f3c7fef009 100644 --- a/js/runner.go +++ b/js/runner.go @@ -112,8 +112,10 @@ func (r *Runner) MakeArchive() *lib.Archive { } // NewVU returns a new initialized VU. -func (r *Runner) NewVU(idLocal, idGlobal uint64, samplesOut chan<- metrics.SampleContainer) (lib.InitializedVU, error) { - vu, err := r.newVU(idLocal, idGlobal, samplesOut) +func (r *Runner) NewVU( + ctx context.Context, idLocal, idGlobal uint64, samplesOut chan<- metrics.SampleContainer, +) (lib.InitializedVU, error) { + vu, err := r.newVU(ctx, idLocal, idGlobal, samplesOut) if err != nil { return nil, err } @@ -121,9 +123,11 @@ func (r *Runner) NewVU(idLocal, idGlobal uint64, samplesOut chan<- metrics.Sampl } //nolint:funlen -func (r *Runner) newVU(idLocal, idGlobal uint64, samplesOut chan<- metrics.SampleContainer) (*VU, error) { +func (r *Runner) newVU( + ctx context.Context, idLocal, idGlobal uint64, samplesOut chan<- metrics.SampleContainer, +) (*VU, error) { // Instantiate a new bundle, make a VU out of it. - bi, err := r.Bundle.Instantiate(r.preInitState.Logger, idLocal) + bi, err := r.Bundle.Instantiate(ctx, r.preInitState.Logger, idLocal) if err != nil { return nil, err } @@ -354,7 +358,7 @@ func (r *Runner) IsExecutable(name string) bool { } // HandleSummary calls the specified summary callback, if supplied. -func (r *Runner) HandleSummary(ctx context.Context, summary *lib.Summary) (map[string]io.Reader, error) { +func (r *Runner) HandleSummary(parentCtx context.Context, summary *lib.Summary) (map[string]io.Reader, error) { summaryDataForJS := summarizeMetricsToObject(summary, r.Bundle.Options, r.setupData) out := make(chan metrics.SampleContainer, 100) @@ -365,7 +369,10 @@ func (r *Runner) HandleSummary(ctx context.Context, summary *lib.Summary) (map[s } }() - vu, err := r.newVU(0, 0, out) + ctx, cancel := context.WithTimeout(parentCtx, r.getTimeoutFor(consts.HandleSummaryFn)) + defer cancel() + + vu, err := r.newVU(ctx, 0, 0, out) if err != nil { return nil, err } @@ -378,8 +385,6 @@ func (r *Runner) HandleSummary(ctx context.Context, summary *lib.Summary) (map[s return nil, fmt.Errorf("exported identifier %s must be a function", consts.HandleSummaryFn) } - ctx, cancel := context.WithTimeout(ctx, r.getTimeoutFor(consts.HandleSummaryFn)) - defer cancel() go func() { <-ctx.Done() vu.Runtime.Interrupt(context.Canceled) @@ -503,12 +508,15 @@ func parseTTL(ttlS string) (time.Duration, error) { // Runs an exported function in its own temporary VU, optionally with an argument. Execution is // interrupted if the context expires. No error is returned if the part does not exist. func (r *Runner) runPart( - ctx context.Context, + parentCtx context.Context, out chan<- metrics.SampleContainer, name string, arg interface{}, ) (goja.Value, error) { - vu, err := r.newVU(0, 0, out) + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + vu, err := r.newVU(ctx, 0, 0, out) if err != nil { return goja.Undefined(), err } @@ -517,8 +525,6 @@ func (r *Runner) runPart( return goja.Undefined(), nil } - ctx, cancel := context.WithCancel(ctx) - defer cancel() go func() { <-ctx.Done() vu.Runtime.Interrupt(context.Canceled) diff --git a/js/runner_test.go b/js/runner_test.go index 317ed826035..8645faddfc9 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -63,14 +63,16 @@ func TestRunnerNew(t *testing.T) { t.Run("NewVU", func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) vuc, ok := initVU.(*VU) require.True(t, ok) assert.Equal(t, int64(0), vuc.getExported("counter").Export()) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) t.Run("RunOnce", func(t *testing.T) { err = vu.RunOnce() @@ -216,10 +218,10 @@ func TestOptionsSettingToScript(t *testing.T) { require.Equal(t, newOptions, r.GetOptions()) samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.NewVU(1, 1, samples) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, samples) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) require.NoError(t, vu.RunOnce()) }) @@ -271,10 +273,10 @@ func TestOptionsPropagationToScript(t *testing.T) { t.Parallel() samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.NewVU(1, 1, samples) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, samples) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) require.NoError(t, vu.RunOnce()) }) @@ -441,7 +443,7 @@ func testSetupDataHelper(t *testing.T, data string) { samples := make(chan metrics.SampleContainer, 100) require.NoError(t, r.Setup(ctx, samples)) - initVU, err := r.NewVU(1, 1, samples) + initVU, err := r.NewVU(ctx, 1, 1, samples) require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) require.NoError(t, vu.RunOnce()) @@ -502,10 +504,10 @@ func TestConsoleInInitContext(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.NewVU(1, 1, samples) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, samples) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) require.NoError(t, vu.RunOnce()) }) @@ -590,10 +592,10 @@ func TestRunnerIntegrationImports(t *testing.T) { for name, r := range testdata { r := r t.Run(name, func(t *testing.T) { - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) @@ -628,7 +630,10 @@ func TestVURunContext(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - vu, err := r.newVU(1, 1, make(chan metrics.SampleContainer, 100)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vu, err := r.newVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) fnCalled := false @@ -646,8 +651,7 @@ func TestVURunContext(t *testing.T) { assert.Equal(t, r.GetDefaultGroup(), state.Group) assert.Equal(t, vu.Transport, state.Transport) }) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + activeVU := vu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = activeVU.RunOnce() require.NoError(t, err) @@ -685,11 +689,11 @@ func TestVURunInterrupt(t *testing.T) { } }() - vu, err := r.newVU(1, 1, samples) - require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) defer cancel() + + vu, err := r.newVU(ctx, 1, 1, samples) + require.NoError(t, err) activeVU := vu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = activeVU.RunOnce() require.Error(t, err) @@ -730,7 +734,7 @@ func TestVURunInterruptDoesntPanic(t *testing.T) { }() var wg sync.WaitGroup - initVU, err := r.newVU(1, 1, samples) + initVU, err := r.newVU(ctx, 1, 1, samples) require.NoError(t, err) for i := 0; i < 100; i++ { wg.Add(1) @@ -786,7 +790,10 @@ func TestVUIntegrationGroups(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - vu, err := r.newVU(1, 1, make(chan metrics.SampleContainer, 100)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + vu, err := r.newVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) fnOuterCalled := false @@ -809,8 +816,7 @@ func TestVUIntegrationGroups(t *testing.T) { assert.Equal(t, "my group", g.Parent.Name) assert.Equal(t, r.GetDefaultGroup(), g.Parent.Parent) }) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + activeVU := vu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = activeVU.RunOnce() require.NoError(t, err) @@ -848,11 +854,12 @@ func TestVUIntegrationMetrics(t *testing.T) { t.Parallel() samples := make(chan metrics.SampleContainer, 100) defer close(samples) - vu, err := r.newVU(1, 1, samples) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + vu, err := r.newVU(ctx, 1, 1, samples) + require.NoError(t, err) activeVU := vu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = activeVU.RunOnce() require.NoError(t, err) @@ -1053,13 +1060,13 @@ func TestVUIntegrationInsecureRequests(t *testing.T) { t.Parallel() r.preInitState.Logger, _ = logtest.NewNullLogger() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) initVU.(*VU).TLSConfig.RootCAs = x509.NewCertPool() //nolint:forcetypeassert initVU.(*VU).TLSConfig.RootCAs.AddCert(cert) //nolint:forcetypeassert - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() if data.errMsg != "" { @@ -1102,10 +1109,10 @@ func TestVUIntegrationBlacklistOption(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.Error(t, err) @@ -1144,10 +1151,10 @@ func TestVUIntegrationBlacklistScript(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.Error(t, err) @@ -1186,11 +1193,12 @@ func TestVUIntegrationBlockHostnamesOption(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVu, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + initVu, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) + vu := initVu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.Error(t, err) @@ -1229,10 +1237,10 @@ func TestVUIntegrationBlockHostnamesScript(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVu, err := r.NewVU(0, 0, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVu, err := r.NewVU(ctx, 0, 0, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.Error(t, err) @@ -1287,11 +1295,11 @@ func TestVUIntegrationHosts(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) @@ -1389,12 +1397,12 @@ func TestVUIntegrationTLSConfig(t *testing.T) { t.Parallel() r.preInitState.Logger, _ = logtest.NewNullLogger() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) initVU.(*VU).TLSConfig.RootCAs = x509.NewCertPool() //nolint:forcetypeassert initVU.(*VU).TLSConfig.RootCAs.AddCert(cert) //nolint:forcetypeassert - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() if data.errMsg != "" { @@ -1416,10 +1424,10 @@ func TestVUIntegrationOpenFunctionError(t *testing.T) { `) require.NoError(t, err) - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.Error(t, err) @@ -1434,10 +1442,10 @@ func TestVUIntegrationOpenFunctionErrorWhenSneaky(t *testing.T) { `) require.NoError(t, err) - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.Error(t, err) @@ -1464,7 +1472,7 @@ func TestVUDoesOpenUnderV0Condition(t *testing.T) { r, err := getSimpleRunner(t, "/script.js", data, fs) require.NoError(t, err) - _, err = r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + _, err = r.NewVU(context.Background(), 1, 1, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) } @@ -1488,7 +1496,7 @@ func TestVUDoesNotOpenUnderConditions(t *testing.T) { r, err := getSimpleRunner(t, "/script.js", data, fs) require.NoError(t, err) - _, err = r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + _, err = r.NewVU(context.Background(), 1, 1, make(chan metrics.SampleContainer, 100)) require.Error(t, err) assert.Contains(t, err.Error(), "open() can't be used with files that weren't previously opened during initialization (__VU==0)") } @@ -1512,7 +1520,7 @@ func TestVUDoesNonExistingPathnUnderConditions(t *testing.T) { r, err := getSimpleRunner(t, "/script.js", data, fs) require.NoError(t, err) - _, err = r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) + _, err = r.NewVU(context.Background(), 1, 1, make(chan metrics.SampleContainer, 100)) require.Error(t, err) assert.Contains(t, err.Error(), "open() can't be used with files that weren't previously opened during initialization (__VU==0)") } @@ -1558,10 +1566,10 @@ func TestVUIntegrationCookiesReset(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) for i := 0; i < 2; i++ { require.NoError(t, vu.RunOnce()) @@ -1618,11 +1626,12 @@ func TestVUIntegrationCookiesNoReset(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) @@ -1658,11 +1667,12 @@ func TestVUIntegrationVUID(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1234, 1234, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + initVU, err := r.NewVU(ctx, 1234, 1234, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) @@ -1815,10 +1825,10 @@ func TestVUIntegrationClientCerts(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() r.preInitState.Logger, _ = logtest.NewNullLogger() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() if len(data.errMsg) > 0 { @@ -1986,10 +1996,10 @@ func TestArchiveRunningIntegrity(t *testing.T) { err = r.Setup(ctx, ch) cancel() require.NoError(t, err) - initVU, err := r.NewVU(1, 1, ch) - require.NoError(t, err) ctx, cancel = context.WithCancel(context.Background()) defer cancel() + initVU, err := r.NewVU(ctx, 1, 1, ch) + require.NoError(t, err) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) @@ -2068,11 +2078,12 @@ func TestStuffNotPanicking(t *testing.T) { `)) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan metrics.SampleContainer, 1000) - initVU, err := r.NewVU(1, 1, ch) + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) errC := make(chan error) go func() { errC <- vu.RunOnce() }() @@ -2102,11 +2113,12 @@ func TestPanicOnSimpleHTML(t *testing.T) { `) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan metrics.SampleContainer, 1000) - initVU, err := r.NewVU(1, 1, ch) + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) errC := make(chan error) go func() { errC <- vu.RunOnce() }() @@ -2185,7 +2197,7 @@ func TestSystemTags(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - vu, err := r.NewVU(uint64(num), 0, samples) + vu, err := r.NewVU(ctx, uint64(num), 0, samples) require.NoError(t, err) activeVU := vu.Activate(&lib.VUActivationParams{ RunContext: ctx, @@ -2237,7 +2249,10 @@ func TestVUPanic(t *testing.T) { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1234, make(chan metrics.SampleContainer, 100)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initVU, err := r.NewVU(ctx, 1, 1234, make(chan metrics.SampleContainer, 100)) require.NoError(t, err) logger := logrus.New() @@ -2248,8 +2263,6 @@ func TestVUPanic(t *testing.T) { } logger.AddHook(&hook) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) vu.(*ActiveVU).Runtime.Set("panic", func(str string) { panic(str) }) vu.(*ActiveVU).state.Logger = logger @@ -2299,7 +2312,10 @@ func runMultiFileTestCase(t *testing.T, tc multiFileTestCase, tb *httpmultibin.H options := runner.GetOptions() require.Empty(t, options.Validate()) - vu, err := runner.NewVU(1, 1, tc.samples) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + vu, err := runner.NewVU(ctx, 1, 1, tc.samples) require.NoError(t, err) jsVU, ok := vu.(*VU) @@ -2307,8 +2323,6 @@ func runMultiFileTestCase(t *testing.T, tc multiFileTestCase, tb *httpmultibin.H jsVU.state.Dialer = tb.Dialer jsVU.state.TLSConfig = tb.TLSClientConfig - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() activeVU := vu.Activate(&lib.VUActivationParams{RunContext: ctx}) err = activeVU.RunOnce() @@ -2331,7 +2345,7 @@ func runMultiFileTestCase(t *testing.T, tc multiFileTestCase, tb *httpmultibin.H RuntimeOptions: tc.rtOpts, }, arc) require.NoError(t, err) - vuFromArc, err := runnerFromArc.NewVU(2, 2, tc.samples) + vuFromArc, err := runnerFromArc.NewVU(ctx, 2, 2, tc.samples) require.NoError(t, err) jsVUFromArc, ok := vuFromArc.(*VU) require.True(t, ok) @@ -2467,10 +2481,10 @@ func TestMinIterationDurationIsCancellable(t *testing.T) { require.NoError(t, err) ch := make(chan metrics.SampleContainer, 1000) - initVU, err := r.NewVU(1, 1, ch) + ctx, cancel := context.WithCancel(context.Background()) + initVU, err := r.NewVU(ctx, 1, 1, ch) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) errC := make(chan error) go func() { errC <- vu.RunOnce() }() @@ -2560,11 +2574,12 @@ func TestForceHTTP1Feature(t *testing.T) { for name, r := range runners { r := r t.Run(name, func(t *testing.T) { - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) @@ -2642,7 +2657,11 @@ func TestExecutionInfo(t *testing.T) { r.Bundle.Options.SystemTags = &metrics.DefaultSystemTagSet samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.NewVU(1, 10, samples) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + initVU, err := r.NewVU(ctx, 1, 10, samples) require.NoError(t, err) testRunState := &lib.TestRunState{ @@ -2654,9 +2673,6 @@ func TestExecutionInfo(t *testing.T) { execScheduler, err := local.NewExecutionScheduler(testRunState) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ctx = lib.WithExecutionState(ctx, execScheduler.GetState()) ctx = lib.WithScenarioState(ctx, &lib.ScenarioState{ Name: "default", @@ -2711,11 +2727,12 @@ exports.default = () => { r := r t.Run(name, func(t *testing.T) { t.Parallel() - initVU, err := r.NewVU(1, 1, make(chan metrics.SampleContainer, 100)) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + initVU, err := r.NewVU(ctx, 1, 1, make(chan metrics.SampleContainer, 100)) + require.NoError(t, err) + vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) diff --git a/js/share_test.go b/js/share_test.go index b4bd87b9f24..a15a0af903f 100644 --- a/js/share_test.go +++ b/js/share_test.go @@ -81,12 +81,13 @@ exports.default = function() { r := r t.Run(name, func(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + samples := make(chan metrics.SampleContainer, 100) - initVU, err := r.NewVU(1, 1, samples) + initVU, err := r.NewVU(ctx, 1, 1, samples) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() vu := initVU.Activate(&lib.VUActivationParams{RunContext: ctx}) err = vu.RunOnce() require.NoError(t, err) diff --git a/lib/executor/common_test.go b/lib/executor/common_test.go index 5fafa0b2456..bca7b3328ab 100644 --- a/lib/executor/common_test.go +++ b/lib/executor/common_test.go @@ -55,7 +55,7 @@ func setupExecutor(t testing.TB, config lib.ExecutorConfig, es *lib.ExecutionSta initVUFunc := func(_ context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { idl, idg := es.GetUniqueVUIdentifiers() - return es.Test.Runner.NewVU(idl, idg, engineOut) + return es.Test.Runner.NewVU(ctx, idl, idg, engineOut) } es.SetInitVUFunc(initVUFunc) diff --git a/lib/executor/ramping_arrival_rate_test.go b/lib/executor/ramping_arrival_rate_test.go index 09bbb03624b..8f69d92b705 100644 --- a/lib/executor/ramping_arrival_rate_test.go +++ b/lib/executor/ramping_arrival_rate_test.go @@ -137,7 +137,7 @@ func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) { defer test.cancel() engineOut := make(chan metrics.SampleContainer, 1000) - test.state.SetInitVUFunc(func(_ context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { + test.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { cur := atomic.LoadInt64(&count) require.Equal(t, cur, int64(1)) time.Sleep(time.Second / 2) @@ -157,7 +157,7 @@ func TestRampingArrivalRateRunUnplannedVUs(t *testing.T) { cur = atomic.LoadInt64(&count) require.NotEqual(t, cur, int64(2)) idl, idg := test.state.GetUniqueVUIdentifiers() - return runner.NewVU(idl, idg, engineOut) + return runner.NewVU(ctx, idl, idg, engineOut) }) assert.NoError(t, test.executor.Run(test.ctx, engineOut)) @@ -197,7 +197,7 @@ func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) { defer test.cancel() engineOut := make(chan metrics.SampleContainer, 1000) - test.state.SetInitVUFunc(func(_ context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { + test.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { t.Log("init") cur := atomic.LoadInt64(&count) require.Equal(t, cur, int64(1)) @@ -208,7 +208,7 @@ func TestRampingArrivalRateRunCorrectRateWithSlowRate(t *testing.T) { require.NotEqual(t, cur, int64(1)) idl, idg := test.state.GetUniqueVUIdentifiers() - return runner.NewVU(idl, idg, engineOut) + return runner.NewVU(ctx, idl, idg, engineOut) }) assert.NoError(t, test.executor.Run(test.ctx, engineOut)) diff --git a/lib/executor/vu_handle_test.go b/lib/executor/vu_handle_test.go index 909f7e0b45c..ba6cf7d30da 100644 --- a/lib/executor/vu_handle_test.go +++ b/lib/executor/vu_handle_test.go @@ -42,7 +42,7 @@ func TestVUHandleRace(t *testing.T) { var getVUCount int64 var returnVUCount int64 getVU := func() (lib.InitializedVU, error) { - return runner.NewVU(uint64(atomic.AddInt64(&getVUCount, 1)), 0, nil) + return runner.NewVU(ctx, uint64(atomic.AddInt64(&getVUCount, 1)), 0, nil) } returnVU := func(_ lib.InitializedVU) { @@ -134,7 +134,7 @@ func TestVUHandleStartStopRace(t *testing.T) { getVU := func() (lib.InitializedVU, error) { returned = make(chan struct{}) - return runner.NewVU(atomic.AddUint64(&vuID, 1), 0, nil) + return runner.NewVU(ctx, atomic.AddUint64(&vuID, 1), 0, nil) } returnVU := func(v lib.InitializedVU) { @@ -196,7 +196,7 @@ type handleVUTest struct { } func (h *handleVUTest) getVU() (lib.InitializedVU, error) { - return h.runner.NewVU(uint64(atomic.AddUint32(&h.getVUCount, 1)), 0, nil) + return h.runner.NewVU(context.Background(), uint64(atomic.AddUint32(&h.getVUCount, 1)), 0, nil) } func (h *handleVUTest) returnVU(_ lib.InitializedVU) { @@ -371,7 +371,7 @@ func BenchmarkVUHandleIterations(b *testing.B) { return nil } getVU := func() (lib.InitializedVU, error) { - return runner.NewVU(uint64(atomic.AddUint32(&getVUCount, 1)), 0, nil) + return runner.NewVU(context.Background(), uint64(atomic.AddUint32(&getVUCount, 1)), 0, nil) } returnVU := func(_ lib.InitializedVU) { diff --git a/lib/runner.go b/lib/runner.go index 4365c6aa925..e202966692c 100644 --- a/lib/runner.go +++ b/lib/runner.go @@ -54,7 +54,7 @@ type Runner interface { // Spawns a new VU. It's fine to make this function rather heavy, if it means a performance // improvement at runtime. Remember, this is called once per VU and normally only at the start // of a test - RunOnce() may be called hundreds of thousands of times, and must be fast. - NewVU(idLocal, idGlobal uint64, out chan<- metrics.SampleContainer) (InitializedVU, error) + NewVU(ctx context.Context, idLocal, idGlobal uint64, out chan<- metrics.SampleContainer) (InitializedVU, error) // Runs pre-test setup, if applicable. Setup(ctx context.Context, out chan<- metrics.SampleContainer) error diff --git a/lib/testutils/minirunner/minirunner.go b/lib/testutils/minirunner/minirunner.go index 45b44a2bf33..25d09bba4ee 100644 --- a/lib/testutils/minirunner/minirunner.go +++ b/lib/testutils/minirunner/minirunner.go @@ -40,7 +40,9 @@ func (r MiniRunner) MakeArchive() *lib.Archive { } // NewVU returns a new VU with an incremental ID. -func (r *MiniRunner) NewVU(idLocal, idGlobal uint64, out chan<- metrics.SampleContainer) (lib.InitializedVU, error) { +func (r *MiniRunner) NewVU( + _ context.Context, idLocal, idGlobal uint64, out chan<- metrics.SampleContainer, +) (lib.InitializedVU, error) { state := &lib.State{VUID: idLocal, VUIDGlobal: idGlobal, Iteration: int64(-1)} if r.runTags != nil { state.Tags = lib.NewVUStateTags(r.runTags) From d45965757f6091746bb8525ca3ca7c6ae61d815c Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 1 Dec 2022 18:23:17 +0200 Subject: [PATCH 2/7] Simplify the js/Bundle.instantiate() API --- js/bundle.go | 16 +++++++++------- js/bundle_test.go | 18 ++++++++---------- js/initcontext_test.go | 36 +++++++++++++----------------------- js/path_resolution_test.go | 13 ++++++------- js/runner.go | 2 +- 5 files changed, 37 insertions(+), 48 deletions(-) diff --git a/js/bundle.go b/js/bundle.go index ca30f1de861..e44c0ec22f6 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -93,7 +93,7 @@ func NewBundle( exports: make(map[string]goja.Callable), registry: piState.Registry, } - if err = bundle.instantiate(piState.Logger, rt, bundle.BaseInitContext, 0); err != nil { + if err = bundle.instantiate(bundle.BaseInitContext, 0); err != nil { return nil, err } @@ -157,7 +157,7 @@ func NewBundleFromArchive(piState *lib.TestPreInitState, arc *lib.Archive) (*Bun registry: piState.Registry, } - if err = bundle.instantiate(piState.Logger, rt, bundle.BaseInitContext, 0); err != nil { + if err = bundle.instantiate(bundle.BaseInitContext, 0); err != nil { return nil, err } @@ -239,19 +239,19 @@ func (b *Bundle) getExports(logger logrus.FieldLogger, rt *goja.Runtime, options } // Instantiate creates a new runtime from this bundle. -func (b *Bundle) Instantiate(ctx context.Context, logger logrus.FieldLogger, vuID uint64) (*BundleInstance, error) { +func (b *Bundle) Instantiate(ctx context.Context, vuID uint64) (*BundleInstance, error) { // Instantiate the bundle into a new VM using a bound init context. This uses a context with a // runtime, but no state, to allow module-provided types to function within the init context. + rt := goja.New() vuImpl := &moduleVUImpl{ ctx: ctx, - runtime: goja.New(), + runtime: rt, } init := newBoundInitContext(b.BaseInitContext, vuImpl) - if err := b.instantiate(logger, vuImpl.runtime, init, vuID); err != nil { + if err := b.instantiate(init, vuID); err != nil { return nil, err } - rt := vuImpl.runtime pgm := init.programs[b.Filename.String()] // this is the main script and it's always present bi := &BundleInstance{ Runtime: rt, @@ -306,7 +306,9 @@ func (b *Bundle) initializeProgramObject(rt *goja.Runtime, init *InitContext) pr return pgm } -func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *InitContext, vuID uint64) (err error) { +func (b *Bundle) instantiate(init *InitContext, vuID uint64) (err error) { + rt := init.moduleVUImpl.runtime + logger := init.logger rt.SetFieldNameMapper(common.FieldNameMapper{}) rt.SetRandSource(common.NewRandSource()) diff --git a/js/bundle_test.go b/js/bundle_test.go index ba86b87458a..658523bb7b5 100644 --- a/js/bundle_test.go +++ b/js/bundle_test.go @@ -477,7 +477,7 @@ func TestNewBundleFromArchive(t *testing.T) { logger := testutils.NewLogger(t) checkBundle := func(t *testing.T, b *Bundle) { require.Equal(t, lib.Options{VUs: null.IntFrom(12345)}, b.Options) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) val, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -570,7 +570,7 @@ func TestNewBundleFromArchive(t *testing.T) { } b, err := NewBundleFromArchive(getTestPreInitState(t, logger, nil), arc) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) val, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -714,7 +714,7 @@ func TestOpen(t *testing.T) { for source, b := range map[string]*Bundle{"source": sourceBundle, "archive": arcBundle} { b := b t.Run(source, func(t *testing.T) { - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) v, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -748,9 +748,8 @@ func TestBundleInstantiate(t *testing.T) { export default function() { return val; } `) require.NoError(t, err) - logger := testutils.NewLogger(t) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) v, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -768,9 +767,8 @@ func TestBundleInstantiate(t *testing.T) { export default function() { return val; } `) require.NoError(t, err) - logger := testutils.NewLogger(t) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) // Ensure `options` properties are correctly marshalled jsOptions := bi.getExported("options").ToObject(bi.Runtime) @@ -782,7 +780,7 @@ func TestBundleInstantiate(t *testing.T) { // Ensure options propagate correctly from outside to the script optOrig := b.Options.VUs b.Options.VUs = null.IntFrom(10) - bi2, err := b.Instantiate(context.Background(), logger, 0) + bi2, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) jsOptions = bi2.getExported("options").ToObject(bi2.Runtime) vus = jsOptions.Get("vus").Export() @@ -818,7 +816,7 @@ func TestBundleEnv(t *testing.T) { require.Equal(t, "1", b.RuntimeOptions.Env["TEST_A"]) require.Equal(t, "", b.RuntimeOptions.Env["TEST_B"]) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -854,7 +852,7 @@ func TestBundleNotSharable(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() for i := 0; i < vus; i++ { - bi, err := b.Instantiate(context.Background(), logger, uint64(i)) + bi, err := b.Instantiate(context.Background(), uint64(i)) require.NoError(t, err) for j := 0; j < iters; j++ { bi.Runtime.Set("__ITER", j) diff --git a/js/initcontext_test.go b/js/initcontext_test.go index b4b05855e11..00ef84ec387 100644 --- a/js/initcontext_test.go +++ b/js/initcontext_test.go @@ -21,7 +21,6 @@ import ( "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" "go.k6.io/k6/lib/netext" - "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" ) @@ -38,7 +37,6 @@ func TestInitContextRequire(t *testing.T) { t.Run("k6", func(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) b, err := getSimpleBundle(t, "/script.js", ` import k6 from "k6"; export let _k6 = k6; @@ -47,7 +45,7 @@ func TestInitContextRequire(t *testing.T) { `) require.NoError(t, err, "bundle error") - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) assert.NoError(t, err, "instance error") exports := bi.pgm.exports @@ -63,7 +61,6 @@ func TestInitContextRequire(t *testing.T) { }) t.Run("group", func(t *testing.T) { - logger := testutils.NewLogger(t) t.Parallel() b, err := getSimpleBundle(t, "/script.js", ` import { group } from "k6"; @@ -73,7 +70,7 @@ func TestInitContextRequire(t *testing.T) { `) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) exports := bi.pgm.exports @@ -155,7 +152,6 @@ func TestInitContextRequire(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() fs := afero.NewMemMapFs() - logger := testutils.NewLogger(t) jsLib := `export default function() { return 12345; }` if constName != "" { @@ -183,7 +179,7 @@ func TestInitContextRequire(t *testing.T) { assert.Contains(t, b.BaseInitContext.programs, "file://"+constPath) } - _, err = b.Instantiate(context.Background(), logger, 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) } @@ -192,7 +188,6 @@ func TestInitContextRequire(t *testing.T) { t.Run("Isolation", func(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) fs := afero.NewMemMapFs() require.NoError(t, afero.WriteFile(fs, "/a.js", []byte(`const myvar = "a";`), 0o644)) require.NoError(t, afero.WriteFile(fs, "/b.js", []byte(`const myvar = "b";`), 0o644)) @@ -207,7 +202,7 @@ func TestInitContextRequire(t *testing.T) { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) assert.NoError(t, err) @@ -236,7 +231,7 @@ func createAndReadFile(t *testing.T, file string, content []byte, expectedLength return nil, err } - bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + bi, err := b.Instantiate(context.Background(), 0) if err != nil { return nil, err } @@ -352,7 +347,7 @@ func TestRequestWithBinaryFile(t *testing.T) { `, srv.URL), fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) root, err := lib.NewGroup("", nil) @@ -499,7 +494,7 @@ func TestRequestWithMultipleBinaryFiles(t *testing.T) { `, srv.URL), fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) root, err := lib.NewGroup("", nil) @@ -550,7 +545,7 @@ func TestInitContextVU(t *testing.T) { export default function() { return vu; } `) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), testutils.NewLogger(t), 5) + bi, err := b.Instantiate(context.Background(), 5) require.NoError(t, err) v, err := bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.NoError(t, err) @@ -559,7 +554,6 @@ func TestInitContextVU(t *testing.T) { func TestSourceMaps(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) fs := afero.NewMemMapFs() require.NoError(t, afero.WriteFile(fs, "/module1.js", []byte(` export function f2(){ @@ -583,7 +577,7 @@ export default function(){ b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -594,7 +588,6 @@ export default function(){ func TestSourceMapsExternal(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) fs := afero.NewMemMapFs() // This example is created through the template-typescript require.NoError(t, afero.WriteFile(fs, "/test1.js", []byte(` @@ -614,7 +607,7 @@ export default function () { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -625,7 +618,6 @@ export default function () { func TestSourceMapsExternalExtented(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) fs := afero.NewMemMapFs() // This example is created through the template-typescript // but was exported to use import/export syntax so it has to go through babel @@ -646,7 +638,7 @@ export default function () { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -659,7 +651,6 @@ export default function () { func TestSourceMapsExternalExtentedInlined(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) fs := afero.NewMemMapFs() // This example is created through the template-typescript // but was exported to use import/export syntax so it has to go through babel @@ -677,7 +668,7 @@ export default function () { b, err := getSimpleBundle(t, "/script.js", data, fs) require.NoError(t, err) - bi, err := b.Instantiate(context.Background(), logger, 0) + bi, err := b.Instantiate(context.Background(), 0) require.NoError(t, err) _, err = bi.getCallableExport(consts.DefaultFn)(goja.Undefined()) require.Error(t, err) @@ -690,7 +681,6 @@ export default function () { func TestImportModificationsAreConsistentBetweenFiles(t *testing.T) { t.Parallel() - logger := testutils.NewLogger(t) fs := afero.NewMemMapFs() require.NoError(t, afero.WriteFile(fs, "/notk6.js", []byte(`export default {group}; function group() {}`), 0o644)) require.NoError(t, afero.WriteFile(fs, "/instrument.js", []byte(` @@ -715,6 +705,6 @@ func TestImportModificationsAreConsistentBetweenFiles(t *testing.T) { `, fs) require.NoError(t, err, "bundle error") - _, err = b.Instantiate(context.Background(), logger, 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) } diff --git a/js/path_resolution_test.go b/js/path_resolution_test.go index 0a0ad1aef8a..d39592647c4 100644 --- a/js/path_resolution_test.go +++ b/js/path_resolution_test.go @@ -6,7 +6,6 @@ import ( "github.com/spf13/afero" "github.com/stretchr/testify/require" - "go.k6.io/k6/lib/testutils" ) // This whole file is about tests around https://github.com/grafana/k6/issues/2674 @@ -27,7 +26,7 @@ func TestOpenPathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/scripts/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) @@ -48,7 +47,7 @@ func TestOpenPathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) @@ -72,7 +71,7 @@ func TestOpenPathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) } @@ -93,7 +92,7 @@ func TestRequirePathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/scripts/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) @@ -114,7 +113,7 @@ func TestRequirePathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) @@ -138,7 +137,7 @@ func TestRequirePathResolution(t *testing.T) { b, err := getSimpleBundle(t, "/path/totally/different/directory/script.js", data, fs) require.NoError(t, err) - _, err = b.Instantiate(context.Background(), testutils.NewLogger(t), 0) + _, err = b.Instantiate(context.Background(), 0) require.NoError(t, err) }) } diff --git a/js/runner.go b/js/runner.go index 4f3c7fef009..a810accde3b 100644 --- a/js/runner.go +++ b/js/runner.go @@ -127,7 +127,7 @@ func (r *Runner) newVU( ctx context.Context, idLocal, idGlobal uint64, samplesOut chan<- metrics.SampleContainer, ) (*VU, error) { // Instantiate a new bundle, make a VU out of it. - bi, err := r.Bundle.Instantiate(ctx, r.preInitState.Logger, idLocal) + bi, err := r.Bundle.Instantiate(ctx, idLocal) if err != nil { return nil, err } From 7b70cf15bb2578efe784e7ff433578f82d6894f0 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Fri, 2 Dec 2022 10:40:36 +0200 Subject: [PATCH 3/7] Interrupt the JS runtime during VU init if the context was cancelled --- js/bundle.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/js/bundle.go b/js/bundle.go index e44c0ec22f6..136398e0fb5 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -306,6 +306,7 @@ func (b *Bundle) initializeProgramObject(rt *goja.Runtime, init *InitContext) pr return pgm } +//nolint:funlen func (b *Bundle) instantiate(init *InitContext, vuID uint64) (err error) { rt := init.moduleVUImpl.runtime logger := init.logger @@ -335,6 +336,20 @@ func (b *Bundle) instantiate(init *InitContext, vuID uint64) (err error) { init.moduleVUImpl.eventLoop = eventloop.New(init.moduleVUImpl) pgm := b.initializeProgramObject(rt, init) + // TODO: make something cleaner for interrupting scripts, and more unified + // (e.g. as a part of the event loop or RunWithPanicCatching()? + initCtxDone := init.moduleVUImpl.ctx.Done() + initDone := make(chan struct{}) + watchDone := make(chan struct{}) + go func() { + select { + case <-initCtxDone: + rt.Interrupt(init.moduleVUImpl.ctx.Err()) + case <-initDone: // do nothing + } + close(watchDone) + }() + err = common.RunWithPanicCatching(logger, rt, func() error { return init.moduleVUImpl.eventLoop.Start(func() error { f, errRun := rt.RunProgram(b.Program) @@ -350,6 +365,8 @@ func (b *Bundle) instantiate(init *InitContext, vuID uint64) (err error) { panic("Somehow a commonjs main module is not wrapped in a function") }) }) + close(initDone) + <-watchDone if err != nil { var exception *goja.Exception From e7a1cc30f534beedefaa59de01c80d9e4f752ffc Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Sat, 3 Dec 2022 15:35:32 +0200 Subject: [PATCH 4/7] Ensure all VUs are gracefully aborted when an init error occurs --- cmd/integration_test.go | 31 +------------------------------ core/local/local.go | 41 +++++++++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 46 deletions(-) diff --git a/cmd/integration_test.go b/cmd/integration_test.go index 1f23c058cbf..7aba6e46f57 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -958,29 +958,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) { export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};} `) - // FIXME: when VU initialization is properly synchronized, replace the - // following lines with this line only: - // - // ts := testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) - // - // See https://github.com/grafana/k6/issues/2790 for details. Right now we - // need the stdOut locking because VU initialization is not properly synchronized: - // when a test is aborted during the init phase, some logs might be emitted - // after the root command returns... - - ts := getSimpleCloudOutputTestState( - t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, int(exitcodes.ScriptAborted), - ) - newRootCommand(ts.globalState).execute() - - ts.outMutex.Lock() - stdOut := ts.stdOut.String() - ts.outMutex.Unlock() - t.Log(stdOut) - assert.Contains(t, stdOut, "test aborted: foo") - assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) - assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) - assert.NotContains(t, stdOut, "bogus summary") + testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger) } func TestAbortedByScriptAbortInVUCode(t *testing.T) { @@ -1090,14 +1068,7 @@ func TestAbortedByScriptInitError(t *testing.T) { ) newRootCommand(ts.globalState).execute() - // FIXME: remove this locking after VU initialization accepts a context and - // is properly synchronized: currently when a test is aborted during the - // init phase, some logs might be emitted after the above command returns... - // see: https://github.com/grafana/k6/issues/2790 - ts.outMutex.Lock() stdOut := ts.stdOut.String() - ts.outMutex.Unlock() - t.Log(stdOut) assert.Contains(t, stdOut, `level=error msg="Error: oops in 2\n\tat file:///`) assert.Contains(t, stdOut, `hint="error while initializing VU #2 (script exception)"`) diff --git a/core/local/local.go b/core/local/local.go index 60312b14aee..61c9b1e59a1 100644 --- a/core/local/local.go +++ b/core/local/local.go @@ -173,15 +173,12 @@ func (e *ExecutionScheduler) initVUsConcurrently( ctx context.Context, samplesOut chan<- metrics.SampleContainer, count uint64, concurrency int, logger logrus.FieldLogger, ) chan error { - doneInits := make(chan error, count) // poor man's early-return waitgroup + doneInits := make(chan error, count) // poor man's waitgroup with results limiter := make(chan struct{}) for i := 0; i < concurrency; i++ { go func() { for range limiter { - // TODO: actually pass the context when we initialize VUs here, - // so we can cancel that initialization if there is an error, - // see https://github.com/grafana/k6/issues/2790 newVU, err := e.initVU(ctx, samplesOut, logger) if err == nil { e.state.AddInitializedVU(newVU) @@ -197,6 +194,10 @@ func (e *ExecutionScheduler) initVUsConcurrently( select { case limiter <- struct{}{}: case <-ctx.Done(): + for skipVu := vuNum; skipVu < count; skipVu++ { + // do not even start initializing the remaining VUs + doneInits <- ctx.Err() + } return } } @@ -290,23 +291,31 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics }), ) - // TODO: once VU initialization accepts a context, when a VU init fails, - // cancel the context and actually wait for all VUs to finish before this - // function returns - that way we won't have any trailing logs, see - // https://github.com/grafana/k6/issues/2790 + var initErr error for vuNum := uint64(0); vuNum < vusToInitialize; vuNum++ { + var err error select { - case err := <-doneInits: - if err != nil { - logger.WithError(err).Debug("VU initialization returned with an error, aborting...") - // the context's cancel() is called in a defer above and will - // abort any in-flight VU initializations - return err + case err = <-doneInits: + if err == nil { + atomic.AddUint64(initializedVUs, 1) } - atomic.AddUint64(initializedVUs, 1) case <-ctx.Done(): - return ctx.Err() + err = ctx.Err() + } + + if err == nil || initErr != nil { + // No error or a previous init error was already saved and we are + // just waiting for VUs to finish aborting + continue } + + logger.WithError(err).Debug("VU initialization returned with an error, aborting...") + initErr = err + cancel() + } + + if initErr != nil { + return initErr } e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { From 384bf5c375aca4ab1c3f9967dc45c335565701e4 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 14 Dec 2022 11:37:04 +0200 Subject: [PATCH 5/7] Simplify and improve mid-test abort mechanisms for integration tests --- cmd/integration_test.go | 172 ++++++++++++++++++++++------------------ 1 file changed, 96 insertions(+), 76 deletions(-) diff --git a/cmd/integration_test.go b/cmd/integration_test.go index 7aba6e46f57..af1cfc48e5c 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -619,6 +619,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { t.Parallel() script := []byte(` import { Counter } from 'k6/metrics'; + import exec from 'k6/execution'; export const options = { scenarios: { @@ -642,18 +643,14 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { tc.add(1); } - export default function () {}; + export default function () { + console.log('simple iter ' + exec.scenario.iterationInTest); + }; `) ts := getSimpleCloudOutputTestState(t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0) - ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) { - go func() { - // simulate a Ctrl+C after 3 seconds - time.Sleep(3 * time.Second) - c <- os.Interrupt - }() - } - ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ } + + asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "simple iter 2") newRootCommand(ts.globalState).execute() @@ -669,46 +666,80 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) } -func TestAbortedByUserWithRestAPI(t *testing.T) { - t.Parallel() - script := []byte(` - import { sleep } from 'k6'; - export default function () { - console.log('a simple iteration') - sleep(1); - }; - - export function teardown() { - console.log('teardown() called'); - } - `) - - ts := getSimpleCloudOutputTestState( - t, script, []string{"-v", "--log-output=stdout", "--iterations", "20"}, - lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0, - ) - - wg := sync.WaitGroup{} +func asyncWaitForStdoutAndRun( + t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string, callback func(), +) { + wg := &sync.WaitGroup{} wg.Add(1) - go func() { defer wg.Done() - newRootCommand(ts.globalState).execute() - }() + reachedCondition := false + for i := 0; i < attempts; i++ { + ts.outMutex.Lock() + stdOut := ts.stdOut.String() + ts.outMutex.Unlock() + + if strings.Contains(stdOut, expText) { + t.Logf("found '%s' in the process stdout on try %d at t=%s", expText, i, time.Now()) + reachedCondition = true + break + } + + t.Logf("did not find the text '%s' in the process stdout on try %d at t=%s", expText, i, time.Now()) + time.Sleep(interval) + } + if reachedCondition { + callback() + return // everything is fine + } - reachedIteration := false - for i := 0; i <= 10 && reachedIteration == false; i++ { - time.Sleep(1 * time.Second) ts.outMutex.Lock() stdOut := ts.stdOut.String() ts.outMutex.Unlock() + t.Log(stdOut) + require.FailNow( + t, "did not find the text '%s' in the process stdout after %d attempts (%s)", + expText, attempts, time.Duration(attempts)*interval, + ) + }() - if !strings.Contains(stdOut, "a simple iteration") { - t.Logf("did not see an iteration on try %d at t=%s", i, time.Now()) - continue + t.Cleanup(wg.Wait) // ensure the test waits for the goroutine to finish +} + +func asyncWaitForStdoutAndStopTestWithInterruptSignal( + t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string, +) { + sendSignal := make(chan struct{}) + ts.globalState.signalNotify = func(c chan<- os.Signal, signals ...os.Signal) { + isAbortNotify := false + for _, s := range signals { + if s == os.Interrupt { + isAbortNotify = true + break + } + } + if !isAbortNotify { + return } + go func() { + <-sendSignal + c <- os.Interrupt + close(sendSignal) + }() + } + ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ } + + asyncWaitForStdoutAndRun(t, ts, attempts, interval, expText, func() { + t.Log("expected stdout text was found, sending interrupt signal...") + sendSignal <- struct{}{} + <-sendSignal + }) +} - reachedIteration = true +func asyncWaitForStdoutAndStopTestFromRESTAPI( + t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string, +) { + asyncWaitForStdoutAndRun(t, ts, attempts, interval, expText, func() { req, err := http.NewRequestWithContext( ts.ctx, http.MethodPatch, fmt.Sprintf("http://%s/v1/status", ts.flags.address), bytes.NewBufferString(`{"data":{"type":"status","id":"default","attributes":{"stopped":true}}}`), @@ -721,11 +752,32 @@ func TestAbortedByUserWithRestAPI(t *testing.T) { t.Logf("Response body: %s", body) assert.NoError(t, resp.Body.Close()) assert.Equal(t, http.StatusOK, resp.StatusCode) - } + }) +} + +func TestAbortedByUserWithRestAPI(t *testing.T) { + t.Parallel() + script := []byte(` + import { sleep } from 'k6'; + export default function () { + console.log('a simple iteration') + sleep(1); + }; + + export function teardown() { + console.log('teardown() called'); + } + `) + + ts := getSimpleCloudOutputTestState( + t, script, []string{"-v", "--log-output=stdout", "--iterations", "20"}, + lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0, + ) + + asyncWaitForStdoutAndStopTestFromRESTAPI(t, ts, 15, time.Second, "a simple iteration") - assert.True(t, reachedIteration) + newRootCommand(ts.globalState).execute() - wg.Wait() stdOut := ts.stdOut.String() t.Log(stdOut) assert.Contains(t, stdOut, `a simple iteration`) @@ -791,40 +843,8 @@ func runTestWithNoLinger(t *testing.T, ts *globalTestState) { func runTestWithLinger(t *testing.T, ts *globalTestState) { ts.args = append(ts.args, "--linger") - - sendSignal := make(chan struct{}) - ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) { - go func() { - <-sendSignal - c <- os.Interrupt - }() - } - ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ } - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - newRootCommand(ts.globalState).execute() - }() - - testFinished := false - for i := 0; i <= 15 && testFinished == false; i++ { - time.Sleep(1 * time.Second) - ts.outMutex.Lock() - stdOut := ts.stdOut.String() - ts.outMutex.Unlock() - - if !strings.Contains(stdOut, "Linger set; waiting for Ctrl+C") { - t.Logf("test wasn't finished on try %d at t=%s", i, time.Now()) - continue - } - testFinished = true - close(sendSignal) - } - - require.True(t, testFinished) - wg.Wait() + asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "Linger set; waiting for Ctrl+C") + newRootCommand(ts.globalState).execute() } func TestAbortedByScriptSetupError(t *testing.T) { From 0a2defd9e37f49a25c8dd4678b1bdff8f93bbe79 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 14 Dec 2022 13:29:31 +0200 Subject: [PATCH 6/7] Add a test to check Ctrl+C interruption during the VU init phase As I've mentioned in the code comments, this test doesn't test the eventually desired behavior, it tests the currently expected behavior as it is. The current behavior is still much better than the behavior before this PR, where the VU init was interrupted without any handling and graceful stops. However, the fix to actually abort k6 with the correct exit code and run_status will be in another PR. --- cmd/integration_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/cmd/integration_test.go b/cmd/integration_test.go index af1cfc48e5c..a4178a437e6 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -1068,6 +1068,45 @@ func testAbortedByScriptTestAbort( return ts } +func TestAbortedByInterruptDuringVUInit(t *testing.T) { + t.Parallel() + script := []byte(` + import { sleep } from 'k6'; + export const options = { + vus: 5, + duration: '10s', + }; + + if (__VU > 1) { + console.log('VU init sleeping for a while'); + sleep(100); + } + + export default function () {}; + `) + + // TODO: fix this to exect lib.RunStatusAbortedUser and + // exitcodes.ExternalAbort + // + // This is testing the current behavior, which is expected, but it's not + // actually the desired one! See https://github.com/grafana/k6/issues/2804 + ts := getSimpleCloudOutputTestState( + t, script, nil, lib.RunStatusAbortedSystem, cloudapi.ResultStatusPassed, int(exitcodes.GenericEngine), + ) + asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "VU init sleeping for a while") + newRootCommand(ts.globalState).execute() + + stdOut := ts.stdOut.String() + t.Log(stdOut) + + assert.Contains(t, stdOut, `level=debug msg="Stopping k6 in response to signal..." sig=interrupt`) + assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) + + // TODO: same as above, fix expected error message and run_status to 5 + assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=6 tainted=false`) + assert.Contains(t, stdOut, `level=error msg="context canceled`) +} + func TestAbortedByScriptInitError(t *testing.T) { t.Parallel() script := []byte(` From a7ad53fef43a3a7885df96e08afab88c83d24436 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Tue, 10 Jan 2023 17:08:01 +0200 Subject: [PATCH 7/7] Improve code and comments from PR suggestions --- js/initcontext.go | 4 +++- js/runner.go | 14 +++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/js/initcontext.go b/js/initcontext.go index 88320825fbe..8b8083ea2c3 100644 --- a/js/initcontext.go +++ b/js/initcontext.go @@ -70,7 +70,9 @@ func NewInitContext( moduleRegistry: getJSModules(), exportsCache: make(map[string]goja.Value), moduleVUImpl: &moduleVUImpl{ - ctx: context.Background(), // TODO: pass a real context here as well? + // TODO: pass a real context as we did for https://github.com/grafana/k6/pull/2800, + // also see https://github.com/grafana/k6/issues/2804 + ctx: context.Background(), runtime: rt, }, } diff --git a/js/runner.go b/js/runner.go index a810accde3b..a8bd4a0a9a9 100644 --- a/js/runner.go +++ b/js/runner.go @@ -358,7 +358,7 @@ func (r *Runner) IsExecutable(name string) bool { } // HandleSummary calls the specified summary callback, if supplied. -func (r *Runner) HandleSummary(parentCtx context.Context, summary *lib.Summary) (map[string]io.Reader, error) { +func (r *Runner) HandleSummary(ctx context.Context, summary *lib.Summary) (map[string]io.Reader, error) { summaryDataForJS := summarizeMetricsToObject(summary, r.Bundle.Options, r.setupData) out := make(chan metrics.SampleContainer, 100) @@ -369,10 +369,10 @@ func (r *Runner) HandleSummary(parentCtx context.Context, summary *lib.Summary) } }() - ctx, cancel := context.WithTimeout(parentCtx, r.getTimeoutFor(consts.HandleSummaryFn)) + summaryCtx, cancel := context.WithTimeout(ctx, r.getTimeoutFor(consts.HandleSummaryFn)) defer cancel() - vu, err := r.newVU(ctx, 0, 0, out) + vu, err := r.newVU(summaryCtx, 0, 0, out) if err != nil { return nil, err } @@ -386,10 +386,10 @@ func (r *Runner) HandleSummary(parentCtx context.Context, summary *lib.Summary) } go func() { - <-ctx.Done() + <-summaryCtx.Done() vu.Runtime.Interrupt(context.Canceled) }() - vu.moduleVUImpl.ctx = ctx + vu.moduleVUImpl.ctx = summaryCtx wrapper := strings.Replace(summaryWrapperLambdaCode, "/*JSLIB_SUMMARY_CODE*/", jslibSummaryCode, 1) handleSummaryWrapperRaw, err := vu.Runtime.RunString(wrapper) @@ -406,11 +406,11 @@ func (r *Runner) HandleSummary(parentCtx context.Context, summary *lib.Summary) vu.Runtime.ToValue(r.Bundle.RuntimeOptions.SummaryExport.String), vu.Runtime.ToValue(summaryDataForJS), } - rawResult, _, _, err := vu.runFn(ctx, false, handleSummaryWrapper, nil, wrapperArgs...) + rawResult, _, _, err := vu.runFn(summaryCtx, false, handleSummaryWrapper, nil, wrapperArgs...) // TODO: refactor the whole JS runner to avoid copy-pasting these complicated bits... // deadline is reached so we have timeouted but this might've not been registered correctly - if deadline, ok := ctx.Deadline(); ok && time.Now().After(deadline) { + if deadline, ok := summaryCtx.Deadline(); ok && time.Now().After(deadline) { // we could have an error that is not context.Canceled in which case we should return it instead if err, ok := err.(*goja.InterruptedError); ok && rawResult != nil && err.Value() != context.Canceled { // TODO: silence this error?