Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass and respect a Context when initializing VUs #2800

Merged
merged 7 commits into from
Jan 11, 2023
Merged
242 changes: 136 additions & 106 deletions cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
t.Parallel()
script := []byte(`
import { Counter } from 'k6/metrics';
import exec from 'k6/execution';
export const options = {
scenarios: {
Expand All @@ -642,18 +643,14 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
tc.add(1);
}
export default function () {};
export default function () {
console.log('simple iter ' + exec.scenario.iterationInTest);
};
`)

ts := getSimpleCloudOutputTestState(t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0)
ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) {
go func() {
// simulate a Ctrl+C after 3 seconds
time.Sleep(3 * time.Second)
c <- os.Interrupt
}()
}
ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ }

asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "simple iter 2")

newRootCommand(ts.globalState).execute()

Expand All @@ -669,46 +666,80 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
}

func TestAbortedByUserWithRestAPI(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from 'k6';
export default function () {
console.log('a simple iteration')
sleep(1);
};
export function teardown() {
console.log('teardown() called');
}
`)

ts := getSimpleCloudOutputTestState(
t, script, []string{"-v", "--log-output=stdout", "--iterations", "20"},
lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0,
)

wg := sync.WaitGroup{}
func asyncWaitForStdoutAndRun(
t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string, callback func(),
) {
wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
newRootCommand(ts.globalState).execute()
}()
reachedCondition := false
for i := 0; i < attempts; i++ {
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

if strings.Contains(stdOut, expText) {
t.Logf("found '%s' in the process stdout on try %d at t=%s", expText, i, time.Now())
reachedCondition = true
break
}

t.Logf("did not find the text '%s' in the process stdout on try %d at t=%s", expText, i, time.Now())
time.Sleep(interval)
}
if reachedCondition {
callback()
return // everything is fine
}

reachedIteration := false
for i := 0; i <= 10 && reachedIteration == false; i++ {
time.Sleep(1 * time.Second)
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()
t.Log(stdOut)
require.FailNow(
t, "did not find the text '%s' in the process stdout after %d attempts (%s)",
expText, attempts, time.Duration(attempts)*interval,
)
}()

if !strings.Contains(stdOut, "a simple iteration") {
t.Logf("did not see an iteration on try %d at t=%s", i, time.Now())
continue
t.Cleanup(wg.Wait) // ensure the test waits for the goroutine to finish
}

func asyncWaitForStdoutAndStopTestWithInterruptSignal(
t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string,
) {
sendSignal := make(chan struct{})
ts.globalState.signalNotify = func(c chan<- os.Signal, signals ...os.Signal) {
isAbortNotify := false
for _, s := range signals {
if s == os.Interrupt {
isAbortNotify = true
break
}
}
if !isAbortNotify {
return
}
go func() {
<-sendSignal
c <- os.Interrupt
close(sendSignal)
}()
}
ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ }

reachedIteration = true
asyncWaitForStdoutAndRun(t, ts, attempts, interval, expText, func() {
t.Log("expected stdout text was found, sending interrupt signal...")
sendSignal <- struct{}{}
<-sendSignal
})
}

func asyncWaitForStdoutAndStopTestFromRESTAPI(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only called from TestAbortedByUserWithRestAPI, and just wraps another helper function. So I would inline the asyncWaitForStdoutAndRun() call, and remove this.

asyncWaitForStdoutAndRun() is also called with the same arguments, except for the callback func. So I would drop the unneded arguments, and add them once they're needed.

In general, abstractions add a layer of indirection that complicates code comprehension, so I wouldn't use them so liberally, and would prefer even some duplication between tests. Or, if tests really are similar, turn them into table tests and avoid the helper functions.

Copy link
Member Author

@na-- na-- Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and I was about to fix this, but I think the "fix" here is to just add more tests that abort the test run via the k6 REST API 😅 See #2804, we need more tests about this behavior... 😞

In #2815 or soon after it, we should be able to nicely abort the test with the REST even during VU init or setup(), so I'd prefer to keep this helper method here and add more tests that use it in that PR.

t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string,
) {
asyncWaitForStdoutAndRun(t, ts, attempts, interval, expText, func() {
req, err := http.NewRequestWithContext(
ts.ctx, http.MethodPatch, fmt.Sprintf("http://%s/v1/status", ts.flags.address),
bytes.NewBufferString(`{"data":{"type":"status","id":"default","attributes":{"stopped":true}}}`),
Expand All @@ -721,11 +752,32 @@ func TestAbortedByUserWithRestAPI(t *testing.T) {
t.Logf("Response body: %s", body)
assert.NoError(t, resp.Body.Close())
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
})
}

func TestAbortedByUserWithRestAPI(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from 'k6';
export default function () {
console.log('a simple iteration')
sleep(1);
};
export function teardown() {
console.log('teardown() called');
}
`)

ts := getSimpleCloudOutputTestState(
t, script, []string{"-v", "--log-output=stdout", "--iterations", "20"},
lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0,
)

assert.True(t, reachedIteration)
asyncWaitForStdoutAndStopTestFromRESTAPI(t, ts, 15, time.Second, "a simple iteration")

newRootCommand(ts.globalState).execute()

wg.Wait()
stdOut := ts.stdOut.String()
t.Log(stdOut)
assert.Contains(t, stdOut, `a simple iteration`)
Expand Down Expand Up @@ -791,40 +843,8 @@ func runTestWithNoLinger(t *testing.T, ts *globalTestState) {

func runTestWithLinger(t *testing.T, ts *globalTestState) {
ts.args = append(ts.args, "--linger")

sendSignal := make(chan struct{})
ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) {
go func() {
<-sendSignal
c <- os.Interrupt
}()
}
ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ }

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
newRootCommand(ts.globalState).execute()
}()

testFinished := false
for i := 0; i <= 15 && testFinished == false; i++ {
time.Sleep(1 * time.Second)
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

if !strings.Contains(stdOut, "Linger set; waiting for Ctrl+C") {
t.Logf("test wasn't finished on try %d at t=%s", i, time.Now())
continue
}
testFinished = true
close(sendSignal)
}

require.True(t, testFinished)
wg.Wait()
asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "Linger set; waiting for Ctrl+C")
newRootCommand(ts.globalState).execute()
}

func TestAbortedByScriptSetupError(t *testing.T) {
Expand Down Expand Up @@ -958,29 +978,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) {
export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};}
`)

// FIXME: when VU initialization is properly synchronized, replace the
// following lines with this line only:
//
// ts := testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger)
//
// See https://github.com/grafana/k6/issues/2790 for details. Right now we
// need the stdOut locking because VU initialization is not properly synchronized:
// when a test is aborted during the init phase, some logs might be emitted
// after the root command returns...

ts := getSimpleCloudOutputTestState(
t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, int(exitcodes.ScriptAborted),
)
newRootCommand(ts.globalState).execute()

ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()
t.Log(stdOut)
assert.Contains(t, stdOut, "test aborted: foo")
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.NotContains(t, stdOut, "bogus summary")
testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger)
}

func TestAbortedByScriptAbortInVUCode(t *testing.T) {
Expand Down Expand Up @@ -1070,6 +1068,45 @@ func testAbortedByScriptTestAbort(
return ts
}

func TestAbortedByInterruptDuringVUInit(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from 'k6';
export const options = {
vus: 5,
duration: '10s',
};
if (__VU > 1) {
console.log('VU init sleeping for a while');
sleep(100);
}
export default function () {};
`)

// TODO: fix this to exect lib.RunStatusAbortedUser and
// exitcodes.ExternalAbort
//
// This is testing the current behavior, which is expected, but it's not
// actually the desired one! See https://github.com/grafana/k6/issues/2804
ts := getSimpleCloudOutputTestState(
t, script, nil, lib.RunStatusAbortedSystem, cloudapi.ResultStatusPassed, int(exitcodes.GenericEngine),
)
asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "VU init sleeping for a while")
newRootCommand(ts.globalState).execute()

stdOut := ts.stdOut.String()
t.Log(stdOut)

assert.Contains(t, stdOut, `level=debug msg="Stopping k6 in response to signal..." sig=interrupt`)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)

// TODO: same as above, fix expected error message and run_status to 5
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=6 tainted=false`)
assert.Contains(t, stdOut, `level=error msg="context canceled`)
}

func TestAbortedByScriptInitError(t *testing.T) {
t.Parallel()
script := []byte(`
Expand All @@ -1090,14 +1127,7 @@ func TestAbortedByScriptInitError(t *testing.T) {
)
newRootCommand(ts.globalState).execute()

// FIXME: remove this locking after VU initialization accepts a context and
// is properly synchronized: currently when a test is aborted during the
// init phase, some logs might be emitted after the above command returns...
// see: https://github.com/grafana/k6/issues/2790
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

t.Log(stdOut)
assert.Contains(t, stdOut, `level=error msg="Error: oops in 2\n\tat file:///`)
assert.Contains(t, stdOut, `hint="error while initializing VU #2 (script exception)"`)
Expand Down
Loading