Skip to content

Commit

Permalink
Add basic event loop with an API to be used by modules
Browse files Browse the repository at this point in the history
As well as cut down setTimeout implementation.

A recent update to goja introduced support for ECMAscript Promise.

The catch here is that Promise's then will only be called when goja
exits executing js code and it has already been resolved.
Also resolving and rejecting Promises needs to happen while no other
js code is being executed as it will otherwise lead to a data race.

This more or less necessitates adding an event loop. Additionally
because a call to a k6 modules such as `k6/http` might make a promise to
signal when an http request is made, but if (no changes were made) the
iteration then finishes before the request completes, nothing would've
stopped the start of a *new* iteration, which would probably just again
ask k6/http to make a new request and return Promise.

This might be a desirable behaviour for some cases but arguably will be
very confusing so this commit also adds a way to Reserve(name should be
changed) a place on the queue so that the event loop will not let the
iteration finish until it gets unreserved.

Additionally to that, some additional code was needed so there is an
event loop for all special functions calls (setup, teardown,
handleSummary, default) and the init context.

This also adds handling of rejected promise which don't have a reject
handler similar to what deno does.

It also adds a per iteration context that gets canceled on the end of
each iteration letting other code know that it needs to stop. This is
particularly needed here as if an iteration gets aborted by a syntax
error (or unhandled promise rejection), a new iteration will start right
after that. But this means that any in-flight asynchronous operation (an
http requests for example) will *not* get stopped. With a context that
gets canceled every time module code can notice that and abort any
operation. For this same reason the event loop need to be waited to be
*empty* before the iteration ends.

This did lead to some ... not very nice code, but a whole package needs
a big refactor which will likely happen once common.Bind and co gets
removed.

And finally, a basic setTimeout implementation was added.
There is no way to currently cancel the setTimeout aka no clearTimeout.

This likely needs to be extended but this can definitely wait. Or we
might decide to actually drop setTimeout.

fixes #882
  • Loading branch information
mstoykov committed Feb 8, 2022
1 parent 9c58b15 commit 6436d4a
Show file tree
Hide file tree
Showing 8 changed files with 578 additions and 28 deletions.
237 changes: 237 additions & 0 deletions core/local/eventloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package local

import (
"io/ioutil"
"net/url"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
)

func TestEventLoop(t *testing.T) {
t.Parallel()
script := []byte(`
setTimeout(()=> {console.log("initcontext setTimeout")}, 200)
console.log("initcontext");
export default function() {
setTimeout(()=> {console.log("default setTimeout")}, 200)
console.log("default");
};
export function setup() {
setTimeout(()=> {console.log("setup setTimeout")}, 200)
console.log("setup");
};
export function teardown() {
setTimeout(()=> {console.log("teardown setTimeout")}, 200)
console.log("teardown");
};
export function handleSummary() {
setTimeout(()=> {console.log("handleSummary setTimeout")}, 200)
console.log("handleSummary");
};
`)

logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}}
logger.AddHook(&logHook)

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger,
lib.Options{
TeardownTimeout: types.NullDurationFrom(time.Second),
SetupTimeout: types.NullDurationFrom(time.Second),
})
defer cancel()

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }()

select {
case err := <-errCh:
require.NoError(t, err)
_, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}})
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{
"initcontext", // first initialization
"initcontext setTimeout",
"initcontext", // for vu
"initcontext setTimeout",
"initcontext", // for setup
"initcontext setTimeout",
"setup", // setup
"setup setTimeout",
"default", // one iteration
"default setTimeout",
"initcontext", // for teardown
"initcontext setTimeout",
"teardown", // teardown
"teardown setTimeout",
"initcontext", // for handleSummary
"initcontext setTimeout",
"handleSummary", // handleSummary
"handleSummary setTimeout",
}, msgs)
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}

func TestEventLoopCrossScenario(t *testing.T) {
t.Parallel()
// TODO refactor the repeating parts here and the previous test
script := []byte(`
import exec from "k6/execution"
export const options = {
scenarios: {
"first":{
executor: "shared-iterations",
maxDuration: "1s",
iterations: 1,
vus: 1,
gracefulStop:"1s",
},
"second": {
executor: "shared-iterations",
maxDuration: "1s",
iterations: 1,
vus: 1,
startTime: "3s",
}
}
}
export default function() {
let i = exec.scenario.name
setTimeout(()=> {console.log(i)}, 3000)
}
`)

logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.ErrorLevel, logrus.WarnLevel, logrus.InfoLevel}}
logger.AddHook(&logHook)

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)
options := runner.GetOptions()

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, options)
defer cancel()

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }()

select {
case err := <-errCh:
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"second"}, msgs)
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}

func TestEventLoopCrossIterations(t *testing.T) {
t.Parallel()
// TODO refactor the repeating parts here and the previous test
script := []byte(`
import { sleep } from "k6"
export const options = {
iterations: 2,
vus: 1,
}
export default function() {
let i = __ITER;
setTimeout(()=> { console.log(i) }, 1000)
if (__ITER == 0) {
throw "just error"
} else {
sleep(1)
}
}
`)

logger := logrus.New()
logger.SetOutput(ioutil.Discard)
logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel, logrus.WarnLevel, logrus.ErrorLevel}}
logger.AddHook(&logHook)

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := js.New(
logger,
&loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: script,
},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)
options := runner.GetOptions()

ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, options)
defer cancel()

errCh := make(chan error, 1)
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }()

select {
case err := <-errCh:
require.NoError(t, err)
entries := logHook.Drain()
msgs := make([]string, len(entries))
for i, entry := range entries {
msgs[i] = entry.Message
}
require.Equal(t, []string{"just error\n\tat /script.js:12:4(13)\n\tat native\n", "1"}, msgs)
case <-time.After(10 * time.Second):
t.Fatal("timed out")
}
}
7 changes: 6 additions & 1 deletion js/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,12 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init *
"require": init.Require,
"open": init.Open,
})
if _, err := rt.RunProgram(b.Program); err != nil {
init.moduleVUImpl.eventLoop.initHelpers(ctx, rt)
err := init.moduleVUImpl.eventLoop.start(func() error {
_, err := rt.RunProgram(b.Program)
return err
})
if err != nil {
var exception *goja.Exception
if errors.As(err, &exception) {
err = &scriptException{inner: exception}
Expand Down
Loading

0 comments on commit 6436d4a

Please sign in to comment.