From 530647265cc5290218785b71bd13a2460aa2656a Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 11 Dec 2023 12:55:32 +0200 Subject: [PATCH] Rewrite timers to be more specification compliant This is more or less a complete rewrite following the specification including leaving links to relevant parts and comments on why we diverge. Among other changes are: 1. timeout/intervals that triggered but weren't executed before they were cleared are *not* executed. 2. setInterval gets rescheduled after it's execution as per the specification. 3. setTimeout/setInterval triggers are ordered based on their invocation as well as the timeout. Again as per the specification. Fixes #3 --- go.mod | 1 - go.sum | 2 - timers/timers.go | 223 +++++++++++++++++++++++++++--------------- timers/timers_test.go | 80 ++++++++++++++- 4 files changed, 222 insertions(+), 84 deletions(-) diff --git a/go.mod b/go.mod index eea82a7..337b461 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.20 require ( github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d - github.com/mstoykov/k6-taskqueue-lib v0.1.0 github.com/stretchr/testify v1.8.4 go.k6.io/k6 v0.48.0 ) diff --git a/go.sum b/go.sum index c0a422b..a2600f1 100644 --- a/go.sum +++ b/go.sum @@ -78,8 +78,6 @@ github.com/mccutchen/go-httpbin v1.1.2-0.20190116014521-c5cb2f4802fa h1:lx8ZnNPw github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd h1:AC3N94irbx2kWGA8f/2Ks7EQl2LxKIRQYuT9IJDwgiI= github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd/go.mod h1:9vRHVuLCjoFfE3GT06X0spdOAO+Zzo4AMjdIwUHBvAk= github.com/mstoykov/envconfig v1.4.1-0.20220114105314-765c6d8c76f1 h1:94EkGmhXrVUEal+uLwFUf4fMXPhZpM5tYxuIsxrCCbI= -github.com/mstoykov/k6-taskqueue-lib v0.1.0 h1:M3eww1HSOLEN6rIkbNOJHhOVhlqnqkhYj7GTieiMBz4= -github.com/mstoykov/k6-taskqueue-lib v0.1.0/go.mod h1:PXdINulapvmzF545Auw++SCD69942FeNvUztaa9dVe4= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/timers/timers.go b/timers/timers.go index fc1a15f..c10f145 100644 --- a/timers/timers.go +++ b/timers/timers.go @@ -2,12 +2,10 @@ package timers import ( - "sync" "sync/atomic" "time" "github.com/dop251/goja" - "github.com/mstoykov/k6-taskqueue-lib/taskqueue" "go.k6.io/k6/js/modules" ) @@ -20,8 +18,16 @@ type Timers struct { vu modules.VU timerStopCounter uint32 - timerStopsLock sync.Mutex - timerStops map[uint32]chan struct{} + + timers map[int]time.Time + // it is just a list of the id in their time.Time order. + // it is used to get timers fire in sequence. + // not anything more then a slice as it is unlikely it will have too many ids to begin with. + timersQueue []int + tasks []func() error + headTimer *time.Timer + + runOnLoop func(func() error) } var ( @@ -38,8 +44,8 @@ func New() *RootModule { // a new instance for each VU. func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { return &Timers{ - vu: vu, - timerStops: make(map[uint32]chan struct{}), + vu: vu, + timers: make(map[int]time.Time), } } @@ -55,27 +61,8 @@ func (e *Timers) Exports() modules.Exports { } } -func noop() error { return nil } - -func (e *Timers) getTimerStopCh() (uint32, chan struct{}) { - id := atomic.AddUint32(&e.timerStopCounter, 1) - ch := make(chan struct{}) - e.timerStopsLock.Lock() - e.timerStops[id] = ch - e.timerStopsLock.Unlock() - return id, ch -} - -func (e *Timers) stopTimerCh(id uint32) bool { //nolint:unparam - e.timerStopsLock.Lock() - defer e.timerStopsLock.Unlock() - ch, ok := e.timerStops[id] - if !ok { - return false - } - delete(e.timerStops, id) - close(ch) - return true +func (e *Timers) nextID() uint32 { + return atomic.AddUint32(&e.timerStopCounter, 1) } func (e *Timers) call(callback goja.Callable, args []goja.Value) error { @@ -85,70 +72,146 @@ func (e *Timers) call(callback goja.Callable, args []goja.Value) error { } func (e *Timers) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint32 { - runOnLoop := e.vu.RegisterCallback() - id, stopCh := e.getTimerStopCh() + id := e.nextID() + e.timerInitialization(callback, delay, args, false, int(id)) + return id +} - if delay < 0 { - delay = 0 +func (e *Timers) clearTimeout(id uint32) { + _, exists := e.timers[int(id)] + if !exists { + return } - - go func() { - timer := time.NewTimer(time.Duration(delay * float64(time.Millisecond))) - defer func() { - timer.Stop() - e.stopTimerCh(id) - }() - - select { - case <-timer.C: - runOnLoop(func() error { - return e.call(callback, args) - }) - case <-stopCh: - runOnLoop(noop) - case <-e.vu.Context().Done(): - e.vu.State().Logger.Warnf("setTimeout %d was stopped because the VU iteration was interrupted", id) - runOnLoop(noop) + delete(e.timers, int(id)) + var i, otherID int + var found bool + for i, otherID = range e.timersQueue { + if id == uint32(otherID) { + found = true + break } - }() + } + if !found { + return + } + + e.timersQueue = append(e.timersQueue[:i], e.timersQueue[i+1:]...) + e.tasks = append(e.tasks[:i], e.tasks[i+1:]...) + // no need to touch the timer - if it was for this it will just do nothing and if it wasn't it will just skip it +} +func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 { + id := e.nextID() + e.timerInitialization(callback, delay, args, true, int(id)) return id } -func (e *Timers) clearTimeout(id uint32) { - e.stopTimerCh(id) +func (e *Timers) clearInterval(id uint32) { + e.clearTimeout(id) } -func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 { - tq := taskqueue.New(e.vu.RegisterCallback) - id, stopCh := e.getTimerStopCh() - - go func() { - ticker := time.NewTicker(time.Duration(delay * float64(time.Millisecond))) - defer func() { - e.stopTimerCh(id) - ticker.Stop() - }() - - for { - defer tq.Close() - select { - case <-ticker.C: - tq.Queue(func() error { - return e.call(callback, args) - }) - case <-stopCh: - return - case <-e.vu.Context().Done(): - e.vu.State().Logger.Warnf("setInterval %d was stopped because the VU iteration was interrupted", id) - return - } +// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#timer-initialisation-steps +// NOTE: previousId from the specification is always send and it is basically id +func (e *Timers) timerInitialization(callback goja.Callable, timeout float64, args []goja.Value, repeat bool, id int) { + // skip all the nesting stuff as we do not care about them + if timeout < 0 { + timeout = 0 + } + + task := func() error { + if _, exist := e.timers[id]; !exist { + return nil // 8.1 } - }() - return id + err := e.call(callback, args) + + if _, exist := e.timers[id]; !exist { // 8.4 + return err + } + + if repeat { + e.timerInitialization(callback, timeout, args, repeat, id) + } else { + delete(e.timers, id) + } + + return err + } + + e.runAfterTimeout(timeout, task, id) } -func (e *Timers) clearInterval(id uint32) { - e.stopTimerCh(id) +// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#run-steps-after-a-timeout +// Notes: +// orderingId is not really used in this case +// id is also required for us unlike how it is defined. Maybe in the future if this moves to core it will be expanded +func (e *Timers) runAfterTimeout(timeout float64, task func() error, id int) { + // TODO figure out a better name + delay := time.Duration(timeout * float64(time.Millisecond)) + timer := time.Now().Add(delay) + e.timers[id] = timer + + // as we have only one orderingId we have one queue + // TODO add queue type and a map of queues when/if we have more then one orderingId + var index int + // don't use range as we want to index to go over one if it needs to go to the end + for index = 0; index < len(e.timersQueue); index++ { + otherTimer := e.timers[e.timersQueue[index]] + if otherTimer.After(timer) { + break + } + } + + e.timersQueue = append(e.timersQueue, 0) + copy(e.timersQueue[index+1:], e.timersQueue[index:]) + e.timersQueue[index] = id + + e.tasks = append(e.tasks, nil) + copy(e.tasks[index+1:], e.tasks[index:]) + e.tasks[index] = task + + if index != 0 { + // we are not the earliers in the queue so we can stop here + return + } + e.setupTaskTimeout() +} + +func (e *Timers) runFirstTask() error { + e.runOnLoop = nil + tasksLen := len(e.tasks) + if tasksLen == 0 { + return nil // everything was cleared + } + + task := e.tasks[0] + copy(e.tasks, e.tasks[1:]) + e.tasks = e.tasks[:tasksLen-1] + + copy(e.timersQueue, e.timersQueue[1:]) + e.timersQueue = e.timersQueue[:tasksLen-1] + + err := task() + + if len(e.timersQueue) > 0 { + e.setupTaskTimeout() + } + return err +} + +func (e *Timers) setupTaskTimeout() { + if e.headTimer != nil { + e.headTimer.Stop() + select { + case <-e.headTimer.C: + default: + } + } + delay := -time.Since(e.timers[e.timersQueue[0]]) + if e.runOnLoop == nil { + e.runOnLoop = e.vu.RegisterCallback() + } + e.headTimer = time.AfterFunc(delay, func() { + e.runOnLoop(e.runFirstTask) + }) } diff --git a/timers/timers_test.go b/timers/timers_test.go index 1319758..1af7549 100644 --- a/timers/timers_test.go +++ b/timers/timers_test.go @@ -54,9 +54,87 @@ func TestSetInterval(t *testing.T) { print("outside setInterval") `) require.NoError(t, err) - require.Greater(t, len(log), 2) + require.Equal(t, len(log), 2) require.Equal(t, "outside setInterval", log[0]) for i, l := range log[1:] { require.Equal(t, "in setInterval", l, i) } } + +func TestSetTimeoutOrder(t *testing.T) { + t.Parallel() + runtime := modulestest.NewRuntime(t) + err := runtime.SetupModuleSystem(map[string]any{"k6/x/timers": New()}, nil, nil) + require.NoError(t, err) + + rt := runtime.VU.Runtime() + var log []string + require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) })) + + _, err = rt.RunString(`globalThis.setTimeout = require("k6/x/timers").setTimeout;`) + require.NoError(t, err) + + for i := 0; i < 100; i++ { + _, err = runtime.RunOnEventLoop(` + setTimeout((_) => print("one"), 1); + setTimeout((_) => print("two"), 1); + setTimeout((_) => print("three"), 1); + setTimeout((_) => print("last"), 10); + setTimeout((_) => print("four"), 1); + setTimeout((_) => print("five"), 1); + setTimeout((_) => print("six"), 1); + print("outside setTimeout"); + `) + require.NoError(t, err) + require.Equal(t, []string{"outside setTimeout", "one", "two", "three", "four", "five", "six", "last"}, log, i) + log = log[:0] + } +} + +func TestSetIntervalOrder(t *testing.T) { + t.Parallel() + runtime := modulestest.NewRuntime(t) + err := runtime.SetupModuleSystem(map[string]any{"k6/x/timers": New()}, nil, nil) + require.NoError(t, err) + + rt := runtime.VU.Runtime() + var log []string + require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) })) + + _, err = rt.RunString(`globalThis.setInterval = require("k6/x/timers").setInterval;`) + require.NoError(t, err) + + _, err = rt.RunString(`globalThis.clearInterval = require("k6/x/timers").clearInterval;`) + require.NoError(t, err) + + for i := 0; i < 100; i++ { + _, err = runtime.RunOnEventLoop(` + var one = setInterval((_) => print("one"), 1); + var two = setInterval((_) => print("two"), 1); + var last = setInterval((_) => { + print("last") + clearInterval(one); + clearInterval(two); + clearInterval(three); + clearInterval(last); + }, 4); + var three = setInterval((_) => print("three"), 1); + print("outside"); + `) + require.NoError(t, err) + require.GreaterOrEqual(t, len(log), 5) + require.Equal(t, log[0], "outside") + for i := 1; i < len(log)-1; i += 3 { + switch len(log) - i { + case 2: + require.Equal(t, log[i:i+1], []string{"one"}) + case 3: + require.Equal(t, log[i:i+2], []string{"one", "two"}) + default: + require.Equal(t, log[i:i+3], []string{"one", "two", "three"}) + } + } + require.Equal(t, log[len(log)-1], "last") + log = log[:0] + } +}