diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index bb31003ed2b..ea004345668 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -119,16 +121,24 @@ func TestPatchStatus(t *testing.T) { defer engine.OutputManager.StopOutputs(nil) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - run, wait, err := engine.Init(ctx, ctx) + defer cancel() + runSubCtx, runSubAbort := execution.NewTestRunContext(ctx, testState.Logger) + engine.AbortFn = runSubAbort + + run, wait, err := engine.Init(ctx, runSubCtx) require.NoError(t, err) + wg := &sync.WaitGroup{} + wg.Add(1) defer func() { - cancel() + runSubAbort(fmt.Errorf("custom cancel signal")) wait() + wg.Wait() }() go func() { - assert.ErrorContains(t, run(), "test run aborted by signal") + assert.ErrorContains(t, run(), "custom cancel signal") + wg.Done() }() // wait for the executor to initialize to avoid a potential data race below time.Sleep(200 * time.Millisecond) diff --git a/cmd/integration_test.go b/cmd/integration_test.go index 550587484f4..53ffc467bd6 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -656,7 +656,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { logs := ts.loggerHook.Drain() assert.False(t, testutils.LogContains(logs, logrus.ErrorLevel, `some thresholds have failed`)) - assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run aborted by signal`)) + assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run was aborted because k6 received a 'interrupt' signal`)) stdOut := ts.stdOut.String() t.Log(stdOut) assert.Contains(t, stdOut, `✓ iterations`) diff --git a/cmd/run.go b/cmd/run.go index 0ad96351960..a960ca3c81a 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -69,6 +69,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { defer runCancel() logger := testRunState.Logger + runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, logger) + // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") execScheduler, err := execution.NewScheduler(testRunState) @@ -113,6 +115,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { if err != nil { return err } + engine.AbortFn = runSubAbort // Spin up the REST API server, if not disabled. if c.gs.flags.address != "" { @@ -169,7 +172,13 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // Trap Interrupts, SIGINTs and SIGTERMs. gracefulStop := func(sig os.Signal) { logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...") - lingerCancel() // stop the test run, metric processing is cancelled below + // first abort the test run this way, to propagate the error + runSubAbort(errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone( + fmt.Errorf("test run was aborted because k6 received a '%s' signal", sig), exitcodes.ExternalAbort, + ), errext.AbortedByUser, + )) + lingerCancel() // cancel this context as well, since the user did Ctrl+C } onHardStop := func(sig os.Signal) { logger.WithField("sig", sig).Error("Aborting k6 in response to signal") @@ -180,7 +189,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // Initialize the engine initBar.Modify(pb.WithConstProgress(0, "Init VUs...")) - engineRun, engineWait, err := engine.Init(globalCtx, runCtx) + engineRun, engineWait, err := engine.Init(globalCtx, runSubCtx) if err != nil { err = common.UnwrapGojaInterruptedError(err) // Add a generic engine exit code if we don't have a more specific one diff --git a/core/engine.go b/core/engine.go index 4e3df278a0d..0a45c379353 100644 --- a/core/engine.go +++ b/core/engine.go @@ -43,7 +43,7 @@ type Engine struct { logger *logrus.Entry stopOnce sync.Once stopChan chan struct{} - abortFn func(error) // temporary + AbortFn func(error) // temporary Samples chan metrics.SampleContainer } @@ -107,24 +107,19 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait } // TODO: move all of this in a separate struct? see main TODO above - runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, e.logger) - e.abortFn = runSubAbort - - execRunResult := make(chan error) - engineRunResult := make(chan error) processMetricsAfterRun := make(chan struct{}) runFn := func() error { e.logger.Debug("Execution scheduler starting...") - err := e.ExecutionScheduler.Run(globalCtx, runSubCtx, e.Samples) - e.logger.WithError(err).Debug("Execution scheduler terminated") - - select { - case <-runCtx.Done(): - // do nothing, the test run was aborted somehow - default: - execRunResult <- err // we finished normally, so send the result + err := e.ExecutionScheduler.Run(globalCtx, runCtx, e.Samples) + if err == nil { + e.logger.Debug("Execution scheduler finished nominally") + err = runCtx.Err() + } + if err != nil { + e.logger.WithError(err).Debug("Engine run returned an error") + } else { + e.logger.Debug("Execution scheduler and engine finished nominally") } - result := <-engineRunResult // get the final result // Make the background jobs process the currently buffered metrics and // run the thresholds, then wait for that to be done. @@ -134,12 +129,10 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait case <-globalCtx.Done(): } - return result + return err } - waitFn := e.startBackgroundProcesses( - globalCtx, runCtx, execRunResult, engineRunResult, runSubAbort, processMetricsAfterRun, - ) + waitFn := e.startBackgroundProcesses(globalCtx, processMetricsAfterRun) return runFn, waitFn, nil } @@ -153,8 +146,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait // and that the remaining metrics samples in the pipeline should be processed as the background // process is about to exit. func (e *Engine) startBackgroundProcesses( - globalCtx, runCtx context.Context, execRunResult, engineRunResult chan error, - runSubAbort func(error), processMetricsAfterRun chan struct{}, + globalCtx context.Context, processMetricsAfterRun chan struct{}, ) (wait func()) { processes := new(sync.WaitGroup) @@ -162,32 +154,7 @@ func (e *Engine) startBackgroundProcesses( processes.Add(1) go func() { defer processes.Done() - e.processMetrics(globalCtx, processMetricsAfterRun, runSubAbort) - }() - - // Update the test run status when the test finishes - processes.Add(1) - go func() { - defer processes.Done() - var err error - defer func() { - e.logger.WithError(err).Debug("Final Engine.Run() result") - engineRunResult <- err - }() - select { - case err = <-execRunResult: - if err != nil { - e.logger.WithError(err).Debug("run: execution scheduler returned an error") - } else { - e.logger.Debug("run: execution scheduler finished nominally") - } - case <-runCtx.Done(): - e.logger.Debug("run: context expired; exiting...") - err = errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort), - errext.AbortedByUser, - ) - } + e.processMetrics(globalCtx, processMetricsAfterRun) }() return processes.Wait @@ -200,15 +167,13 @@ func (e *Engine) startBackgroundProcesses( // The `processMetricsAfterRun` channel argument is used by the caller to signal // that the test run is finished, no more metric samples will be produced, and that // the metrics samples remaining in the pipeline should be should be processed. -func (e *Engine) processMetrics( - globalCtx context.Context, processMetricsAfterRun chan struct{}, runSubAbort func(error), -) { +func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRun chan struct{}) { sampleContainers := []metrics.SampleContainer{} // Run thresholds, if not disabled. var finalizeThresholds func() (breached []string) if !e.runtimeOptions.NoThresholds.Bool { - finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(runSubAbort) + finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(e.AbortFn) } ticker := time.NewTicker(collectRate) @@ -275,7 +240,7 @@ func (e *Engine) Stop() { errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI), errext.AbortedByUser, ) - e.abortFn(err) + e.AbortFn(err) close(e.stopChan) }) } diff --git a/core/engine_test.go b/core/engine_test.go index 65203010d20..0c26b1e6376 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -34,11 +34,11 @@ const isWindows = runtime.GOOS == "windows" // TODO: completely rewrite all of these tests type testStruct struct { - engine *Engine - run func() error - runCancel func() - wait func() - piState *lib.TestPreInitState + engine *Engine + run func() error + runAbort func(error) + wait func() + piState *lib.TestPreInitState } func getTestPreInitState(tb testing.TB) *lib.TestPreInitState { @@ -99,16 +99,19 @@ func newTestEngineWithTestPreInitState( //nolint:golint } else { runCtx, runCancel = context.WithCancel(globalCtx) } - run, waitFn, err := engine.Init(globalCtx, runCtx) + runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, piState.Logger) + engine.AbortFn = runSubAbort + + run, waitFn, err := engine.Init(globalCtx, runSubCtx) require.NoError(t, err) var test *testStruct test = &testStruct{ - engine: engine, - run: run, - runCancel: runCancel, + engine: engine, + run: run, + runAbort: runSubAbort, wait: func() { - test.runCancel() + runCancel() globalCancel() waitFn() engine.OutputManager.StopOutputs(nil) @@ -143,7 +146,7 @@ func TestEngineRun(t *testing.T) { defer test.wait() startTime := time.Now() - assert.ErrorContains(t, test.run(), "test run aborted by signal") + assert.ErrorContains(t, test.run(), "context deadline exceeded") assert.WithinDuration(t, startTime.Add(duration), time.Now(), 100*time.Millisecond) <-done }) @@ -193,8 +196,8 @@ func TestEngineRun(t *testing.T) { errC := make(chan error) go func() { errC <- test.run() }() <-signalChan - test.runCancel() - assert.ErrorContains(t, <-errC, "test run aborted by signal") + test.runAbort(fmt.Errorf("custom error")) + assert.ErrorContains(t, <-errC, "custom error") test.wait() found := 0 @@ -1217,7 +1220,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { var test *testStruct runner := &minirunner.MiniRunner{ Fn: func(_ context.Context, _ *lib.State, _ chan<- metrics.SampleContainer) error { - test.runCancel() // we cancel the run immediately after the test starts + test.runAbort(fmt.Errorf("custom error")) // we cancel the run immediately after the test starts return nil }, TeardownFn: func(_ context.Context, out chan<- metrics.SampleContainer) error { @@ -1238,7 +1241,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }, piState) - assert.ErrorContains(t, test.run(), "test run aborted by signal") + assert.ErrorContains(t, test.run(), "custom error") test.wait() var count float64