Skip to content

Commit

Permalink
Merge 9c9e35f into 8418147
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- authored Jan 10, 2023
2 parents 8418147 + 9c9e35f commit b641de7
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 346 deletions.
242 changes: 136 additions & 106 deletions cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
t.Parallel()
script := []byte(`
import { Counter } from 'k6/metrics';
import exec from 'k6/execution';
export const options = {
scenarios: {
Expand All @@ -642,18 +643,14 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
tc.add(1);
}
export default function () {};
export default function () {
console.log('simple iter ' + exec.scenario.iterationInTest);
};
`)

ts := getSimpleCloudOutputTestState(t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0)
ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) {
go func() {
// simulate a Ctrl+C after 3 seconds
time.Sleep(3 * time.Second)
c <- os.Interrupt
}()
}
ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ }

asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "simple iter 2")

newRootCommand(ts.globalState).execute()

Expand All @@ -669,46 +666,80 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
}

func TestAbortedByUserWithRestAPI(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from 'k6';
export default function () {
console.log('a simple iteration')
sleep(1);
};
export function teardown() {
console.log('teardown() called');
}
`)

ts := getSimpleCloudOutputTestState(
t, script, []string{"-v", "--log-output=stdout", "--iterations", "20"},
lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0,
)

wg := sync.WaitGroup{}
func asyncWaitForStdoutAndRun(
t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string, callback func(),
) {
wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer wg.Done()
newRootCommand(ts.globalState).execute()
}()
reachedCondition := false
for i := 0; i < attempts; i++ {
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

if strings.Contains(stdOut, expText) {
t.Logf("found '%s' in the process stdout on try %d at t=%s", expText, i, time.Now())
reachedCondition = true
break
}

t.Logf("did not find the text '%s' in the process stdout on try %d at t=%s", expText, i, time.Now())
time.Sleep(interval)
}
if reachedCondition {
callback()
return // everything is fine
}

reachedIteration := false
for i := 0; i <= 10 && reachedIteration == false; i++ {
time.Sleep(1 * time.Second)
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()
t.Log(stdOut)
require.FailNow(
t, "did not find the text '%s' in the process stdout after %d attempts (%s)",
expText, attempts, time.Duration(attempts)*interval,
)
}()

if !strings.Contains(stdOut, "a simple iteration") {
t.Logf("did not see an iteration on try %d at t=%s", i, time.Now())
continue
t.Cleanup(wg.Wait) // ensure the test waits for the goroutine to finish
}

func asyncWaitForStdoutAndStopTestWithInterruptSignal(
t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string,
) {
sendSignal := make(chan struct{})
ts.globalState.signalNotify = func(c chan<- os.Signal, signals ...os.Signal) {
isAbortNotify := false
for _, s := range signals {
if s == os.Interrupt {
isAbortNotify = true
break
}
}
if !isAbortNotify {
return
}
go func() {
<-sendSignal
c <- os.Interrupt
close(sendSignal)
}()
}
ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ }

reachedIteration = true
asyncWaitForStdoutAndRun(t, ts, attempts, interval, expText, func() {
t.Log("expected stdout text was found, sending interrupt signal...")
sendSignal <- struct{}{}
<-sendSignal
})
}

func asyncWaitForStdoutAndStopTestFromRESTAPI(
t *testing.T, ts *globalTestState, attempts int, interval time.Duration, expText string,
) {
asyncWaitForStdoutAndRun(t, ts, attempts, interval, expText, func() {
req, err := http.NewRequestWithContext(
ts.ctx, http.MethodPatch, fmt.Sprintf("http://%s/v1/status", ts.flags.address),
bytes.NewBufferString(`{"data":{"type":"status","id":"default","attributes":{"stopped":true}}}`),
Expand All @@ -721,11 +752,32 @@ func TestAbortedByUserWithRestAPI(t *testing.T) {
t.Logf("Response body: %s", body)
assert.NoError(t, resp.Body.Close())
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
})
}

func TestAbortedByUserWithRestAPI(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from 'k6';
export default function () {
console.log('a simple iteration')
sleep(1);
};
export function teardown() {
console.log('teardown() called');
}
`)

ts := getSimpleCloudOutputTestState(
t, script, []string{"-v", "--log-output=stdout", "--iterations", "20"},
lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, 0,
)

assert.True(t, reachedIteration)
asyncWaitForStdoutAndStopTestFromRESTAPI(t, ts, 15, time.Second, "a simple iteration")

newRootCommand(ts.globalState).execute()

wg.Wait()
stdOut := ts.stdOut.String()
t.Log(stdOut)
assert.Contains(t, stdOut, `a simple iteration`)
Expand Down Expand Up @@ -791,40 +843,8 @@ func runTestWithNoLinger(t *testing.T, ts *globalTestState) {

func runTestWithLinger(t *testing.T, ts *globalTestState) {
ts.args = append(ts.args, "--linger")

sendSignal := make(chan struct{})
ts.globalState.signalNotify = func(c chan<- os.Signal, s ...os.Signal) {
go func() {
<-sendSignal
c <- os.Interrupt
}()
}
ts.globalState.signalStop = func(c chan<- os.Signal) { /* noop */ }

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
newRootCommand(ts.globalState).execute()
}()

testFinished := false
for i := 0; i <= 15 && testFinished == false; i++ {
time.Sleep(1 * time.Second)
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

if !strings.Contains(stdOut, "Linger set; waiting for Ctrl+C") {
t.Logf("test wasn't finished on try %d at t=%s", i, time.Now())
continue
}
testFinished = true
close(sendSignal)
}

require.True(t, testFinished)
wg.Wait()
asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "Linger set; waiting for Ctrl+C")
newRootCommand(ts.globalState).execute()
}

func TestAbortedByScriptSetupError(t *testing.T) {
Expand Down Expand Up @@ -958,29 +978,7 @@ func TestAbortedByTestAbortInNonFirstInitCode(t *testing.T) {
export function handleSummary() { return {stdout: '\n\n\nbogus summary\n\n\n'};}
`)

// FIXME: when VU initialization is properly synchronized, replace the
// following lines with this line only:
//
// ts := testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger)
//
// See https://github.com/grafana/k6/issues/2790 for details. Right now we
// need the stdOut locking because VU initialization is not properly synchronized:
// when a test is aborted during the init phase, some logs might be emitted
// after the root command returns...

ts := getSimpleCloudOutputTestState(
t, script, nil, lib.RunStatusAbortedUser, cloudapi.ResultStatusPassed, int(exitcodes.ScriptAborted),
)
newRootCommand(ts.globalState).execute()

ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()
t.Log(stdOut)
assert.Contains(t, stdOut, "test aborted: foo")
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)
assert.NotContains(t, stdOut, "bogus summary")
testAbortedByScriptTestAbort(t, false, script, runTestWithNoLinger)
}

func TestAbortedByScriptAbortInVUCode(t *testing.T) {
Expand Down Expand Up @@ -1070,6 +1068,45 @@ func testAbortedByScriptTestAbort(
return ts
}

func TestAbortedByInterruptDuringVUInit(t *testing.T) {
t.Parallel()
script := []byte(`
import { sleep } from 'k6';
export const options = {
vus: 5,
duration: '10s',
};
if (__VU > 1) {
console.log('VU init sleeping for a while');
sleep(100);
}
export default function () {};
`)

// TODO: fix this to exect lib.RunStatusAbortedUser and
// exitcodes.ExternalAbort
//
// This is testing the current behavior, which is expected, but it's not
// actually the desired one! See https://github.com/grafana/k6/issues/2804
ts := getSimpleCloudOutputTestState(
t, script, nil, lib.RunStatusAbortedSystem, cloudapi.ResultStatusPassed, int(exitcodes.GenericEngine),
)
asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "VU init sleeping for a while")
newRootCommand(ts.globalState).execute()

stdOut := ts.stdOut.String()
t.Log(stdOut)

assert.Contains(t, stdOut, `level=debug msg="Stopping k6 in response to signal..." sig=interrupt`)
assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`)

// TODO: same as above, fix expected error message and run_status to 5
assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=6 tainted=false`)
assert.Contains(t, stdOut, `level=error msg="context canceled`)
}

func TestAbortedByScriptInitError(t *testing.T) {
t.Parallel()
script := []byte(`
Expand All @@ -1090,14 +1127,7 @@ func TestAbortedByScriptInitError(t *testing.T) {
)
newRootCommand(ts.globalState).execute()

// FIXME: remove this locking after VU initialization accepts a context and
// is properly synchronized: currently when a test is aborted during the
// init phase, some logs might be emitted after the above command returns...
// see: https://github.com/grafana/k6/issues/2790
ts.outMutex.Lock()
stdOut := ts.stdOut.String()
ts.outMutex.Unlock()

t.Log(stdOut)
assert.Contains(t, stdOut, `level=error msg="Error: oops in 2\n\tat file:///`)
assert.Contains(t, stdOut, `hint="error while initializing VU #2 (script exception)"`)
Expand Down
Loading

0 comments on commit b641de7

Please sign in to comment.