-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add basic event loop with some API to be used by modules
As well as cut down setTimeout implementation A recent update to goja introduced Promise. The catch here is that Promise's then will only be called when goja exits executing js code and it has already been resolved. Also resolving and rejecting Promises needs to happen while no other js code is being executed. This more or less necessitates adding an event loop. Additionally because a call to a k6 modules such as `k6/http` might make a promise to signal when an http request is made, but if (no changes were made) the iteration then finishes before the request completes, nothing would've stopped the start of a *new* iteration (which would probably just again ask k6/http to make a new request and return Promise). This might be a desirable behaviour for some cases but arguably will be very confusing so this commit also adds a way to Reserve(name should be changed) a place on the queue (doesn't reserve an exact spot) so that the event loop will not let the iteration finish until it gets unreserved. Additional abstraction to make a "handled" Promise is added so that k6 js-modules can use it the same way goja.NewPromise but with the key the difference that the Promise will be waited to be resolved before the event loop can end. Additionally to that, some additional code was needed so there is an event loop for all special functions calls (setup, teardown, handleSummary, default) and the init context. And finally, a basic setTimeout implementation was added. There is no way to currently cancel the setTimeout and it doesn't take code as the first argument or additional arguments to be given to the callback later on. fixes #882
- Loading branch information
Showing
8 changed files
with
401 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* | ||
* k6 - a next-generation load testing tool | ||
* Copyright (C) 2021 Load Impact | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
* | ||
*/ | ||
|
||
package local | ||
|
||
import ( | ||
"io/ioutil" | ||
"net/url" | ||
"testing" | ||
"time" | ||
|
||
"github.com/sirupsen/logrus" | ||
"github.com/stretchr/testify/require" | ||
"go.k6.io/k6/js" | ||
"go.k6.io/k6/lib" | ||
"go.k6.io/k6/lib/metrics" | ||
"go.k6.io/k6/lib/testutils" | ||
"go.k6.io/k6/lib/types" | ||
"go.k6.io/k6/loader" | ||
) | ||
|
||
func TestEventLoop(t *testing.T) { | ||
t.Parallel() | ||
script := []byte(` | ||
setTimeout(()=> {console.log("initcontext setTimeout")}, 200) | ||
console.log("initcontext"); | ||
export default function() { | ||
setTimeout(()=> {console.log("default setTimeout")}, 200) | ||
console.log("default"); | ||
}; | ||
export function setup() { | ||
setTimeout(()=> {console.log("setup setTimeout")}, 200) | ||
console.log("setup"); | ||
}; | ||
export function teardown() { | ||
setTimeout(()=> {console.log("teardown setTimeout")}, 200) | ||
console.log("teardown"); | ||
}; | ||
export function handleSummary() { | ||
setTimeout(()=> {console.log("handleSummary setTimeout")}, 200) | ||
console.log("handleSummary"); | ||
}; | ||
`) | ||
|
||
logger := logrus.New() | ||
logger.SetOutput(ioutil.Discard) | ||
logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} | ||
logger.AddHook(&logHook) | ||
|
||
registry := metrics.NewRegistry() | ||
builtinMetrics := metrics.RegisterBuiltinMetrics(registry) | ||
runner, err := js.New( | ||
logger, | ||
&loader.SourceData{ | ||
URL: &url.URL{Path: "/script.js"}, | ||
Data: script, | ||
}, | ||
nil, | ||
lib.RuntimeOptions{}, | ||
builtinMetrics, | ||
registry, | ||
) | ||
require.NoError(t, err) | ||
|
||
ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, | ||
lib.Options{ | ||
TeardownTimeout: types.NullDurationFrom(time.Second), | ||
SetupTimeout: types.NullDurationFrom(time.Second), | ||
}) | ||
defer cancel() | ||
|
||
errCh := make(chan error, 1) | ||
go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }() | ||
|
||
select { | ||
case err := <-errCh: | ||
require.NoError(t, err) | ||
_, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}}) | ||
require.NoError(t, err) | ||
entries := logHook.Drain() | ||
msgs := make([]string, len(entries)) | ||
for i, entry := range entries { | ||
msgs[i] = entry.Message | ||
} | ||
require.Equal(t, []string{ | ||
"initcontext", // first initialization | ||
"initcontext setTimeout", | ||
"initcontext", // for vu | ||
"initcontext setTimeout", | ||
"initcontext", // for setup | ||
"initcontext setTimeout", | ||
"setup", // setup | ||
"setup setTimeout", | ||
"default", // one iteration | ||
"default setTimeout", | ||
"initcontext", // for teardown | ||
"initcontext setTimeout", | ||
"teardown", // teardown | ||
"teardown setTimeout", | ||
"initcontext", // for handleSummary | ||
"initcontext setTimeout", | ||
"handleSummary", // handleSummary | ||
"handleSummary setTimeout", | ||
}, msgs) | ||
case <-time.After(10 * time.Second): | ||
t.Fatal("timed out") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* | ||
* k6 - a next-generation load testing tool | ||
* Copyright (C) 2021 Load Impact | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
* | ||
*/ | ||
|
||
package js | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
) | ||
|
||
// an event loop | ||
// TODO: DO NOT USE AS IT'S NOT DONE | ||
type eventLoop struct { | ||
queueLock sync.Mutex | ||
queue []func() | ||
wakeupCh chan struct{} // maybe use sync.Cond ? | ||
reservedCount int | ||
} | ||
|
||
func newEventLoop() *eventLoop { | ||
return &eventLoop{ | ||
wakeupCh: make(chan struct{}, 1), | ||
} | ||
} | ||
|
||
// RunOnLoop queues the function to be called from/on the loop | ||
// This needs to be called before calling `Start` | ||
// TODO maybe have only Reserve as this is equal to `e.Reserve()(f)` | ||
func (e *eventLoop) RunOnLoop(f func()) { | ||
e.queueLock.Lock() | ||
e.queue = append(e.queue, f) | ||
e.queueLock.Unlock() | ||
select { | ||
case e.wakeupCh <- struct{}{}: | ||
default: | ||
} | ||
} | ||
|
||
// Reserve "reserves" a spot on the loop, preventing it from returning/finishing. The returning function will queue it's | ||
// argument and wakeup the loop if needed and also unreserve the spot so that the loop can exit. | ||
// this should be used instead of MakeHandledPromise if a promise will not be returned | ||
// TODO better name | ||
func (e *eventLoop) Reserve() func(func()) { | ||
e.queueLock.Lock() | ||
e.reservedCount++ | ||
e.queueLock.Unlock() | ||
|
||
return func(f func()) { | ||
e.queueLock.Lock() | ||
e.queue = append(e.queue, f) | ||
e.reservedCount-- | ||
e.queueLock.Unlock() | ||
select { | ||
case e.wakeupCh <- struct{}{}: | ||
default: | ||
} | ||
} | ||
} | ||
|
||
// Start will run the event loop until it's empty and there are no reserved spots | ||
// or the context is done | ||
//nolint:cyclop | ||
func (e *eventLoop) Start(ctx context.Context) { | ||
done := ctx.Done() | ||
for { | ||
select { // check if done | ||
case <-done: | ||
return | ||
default: | ||
} | ||
|
||
// acquire the queue | ||
e.queueLock.Lock() | ||
queue := e.queue | ||
e.queue = make([]func(), 0, len(queue)) | ||
reserved := e.reservedCount != 0 | ||
e.queueLock.Unlock() | ||
|
||
if len(queue) == 0 { | ||
if !reserved { // we have empty queue and nothing that reserved a spot | ||
return | ||
} | ||
select { // wait until the reserved is done | ||
case <-done: | ||
return | ||
case <-e.wakeupCh: | ||
} | ||
} | ||
|
||
for _, f := range queue { | ||
// run each function in the queue if not done | ||
select { | ||
case <-done: | ||
return | ||
default: | ||
f() | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* | ||
* k6 - a next-generation load testing tool | ||
* Copyright (C) 2021 Load Impact | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
* | ||
*/ | ||
|
||
package js | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBasicEventLoop(t *testing.T) { | ||
t.Parallel() | ||
loop := newEventLoop() | ||
var ran int | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
loop.RunOnLoop(func() { ran++ }) | ||
loop.Start(ctx) | ||
require.Equal(t, ran, 1) | ||
loop.RunOnLoop(func() { ran++ }) | ||
loop.RunOnLoop(func() { ran++ }) | ||
loop.Start(ctx) | ||
require.Equal(t, ran, 3) | ||
loop.RunOnLoop(func() { ran++; cancel() }) | ||
loop.RunOnLoop(func() { ran++ }) | ||
loop.Start(ctx) | ||
require.Equal(t, ran, 4) | ||
} | ||
|
||
func TestEventLoopReserve(t *testing.T) { | ||
t.Parallel() | ||
loop := newEventLoop() | ||
var ran int | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
loop.RunOnLoop(func() { | ||
ran++ | ||
r := loop.Reserve() | ||
go func() { | ||
time.Sleep(time.Second) | ||
r(func() { | ||
ran++ | ||
}) | ||
}() | ||
}) | ||
start := time.Now() | ||
loop.Start(ctx) | ||
took := time.Since(start) | ||
require.Equal(t, ran, 2) | ||
require.Greater(t, took, time.Second) | ||
} |
Oops, something went wrong.