Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Commit

Permalink
Rewrite timers to be more specification compliant
Browse files Browse the repository at this point in the history
This is more or less a complete rewrite following the specification
including leaving links to relevant parts and comments on why we
diverge.

Among other changes are:
1. timeout/intervals that triggered but weren't executed before they
   were cleared are *not* executed.
2. setInterval gets rescheduled after it's execution as per the
   specification.
3. setTimeout/setInterval triggers are ordered based on their invocation
   as well as the timeout. Again as per the specification.

Fixes #3
  • Loading branch information
mstoykov committed Dec 11, 2023
1 parent abc1b57 commit d8239b9
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 84 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
github.com/mstoykov/k6-taskqueue-lib v0.1.0
github.com/stretchr/testify v1.8.4
go.k6.io/k6 v0.48.0
)
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ github.com/mccutchen/go-httpbin v1.1.2-0.20190116014521-c5cb2f4802fa h1:lx8ZnNPw
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd h1:AC3N94irbx2kWGA8f/2Ks7EQl2LxKIRQYuT9IJDwgiI=
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd/go.mod h1:9vRHVuLCjoFfE3GT06X0spdOAO+Zzo4AMjdIwUHBvAk=
github.com/mstoykov/envconfig v1.4.1-0.20220114105314-765c6d8c76f1 h1:94EkGmhXrVUEal+uLwFUf4fMXPhZpM5tYxuIsxrCCbI=
github.com/mstoykov/k6-taskqueue-lib v0.1.0 h1:M3eww1HSOLEN6rIkbNOJHhOVhlqnqkhYj7GTieiMBz4=
github.com/mstoykov/k6-taskqueue-lib v0.1.0/go.mod h1:PXdINulapvmzF545Auw++SCD69942FeNvUztaa9dVe4=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
223 changes: 143 additions & 80 deletions timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
package timers

import (
"sync"
"sync/atomic"
"time"

"github.com/dop251/goja"
"github.com/mstoykov/k6-taskqueue-lib/taskqueue"
"go.k6.io/k6/js/modules"
)

Expand All @@ -20,8 +18,16 @@ type Timers struct {
vu modules.VU

timerStopCounter uint32
timerStopsLock sync.Mutex
timerStops map[uint32]chan struct{}

timers map[int]time.Time
// it is just a list of the id in their time.Time order.
// it is used to get timers fire in sequence.
// not anything more then a slice as it is unlikely it will have too many ids to begin with.
timersQueue []int
tasks []func() error
headTimer *time.Timer

runOnLoop func(func() error)
}

var (
Expand All @@ -38,8 +44,8 @@ func New() *RootModule {
// a new instance for each VU.
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return &Timers{
vu: vu,
timerStops: make(map[uint32]chan struct{}),
vu: vu,
timers: make(map[int]time.Time),
}
}

Expand All @@ -55,27 +61,8 @@ func (e *Timers) Exports() modules.Exports {
}
}

func noop() error { return nil }

func (e *Timers) getTimerStopCh() (uint32, chan struct{}) {
id := atomic.AddUint32(&e.timerStopCounter, 1)
ch := make(chan struct{})
e.timerStopsLock.Lock()
e.timerStops[id] = ch
e.timerStopsLock.Unlock()
return id, ch
}

func (e *Timers) stopTimerCh(id uint32) bool { //nolint:unparam
e.timerStopsLock.Lock()
defer e.timerStopsLock.Unlock()
ch, ok := e.timerStops[id]
if !ok {
return false
}
delete(e.timerStops, id)
close(ch)
return true
func (e *Timers) nextID() uint32 {
return atomic.AddUint32(&e.timerStopCounter, 1)
}

func (e *Timers) call(callback goja.Callable, args []goja.Value) error {
Expand All @@ -85,70 +72,146 @@ func (e *Timers) call(callback goja.Callable, args []goja.Value) error {
}

func (e *Timers) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
runOnLoop := e.vu.RegisterCallback()
id, stopCh := e.getTimerStopCh()
id := e.nextID()
e.timerInitialization(callback, delay, args, false, int(id))
return id
}

if delay < 0 {
delay = 0
func (e *Timers) clearTimeout(id uint32) {
_, exists := e.timers[int(id)]
if !exists {
return
}

go func() {
timer := time.NewTimer(time.Duration(delay * float64(time.Millisecond)))
defer func() {
timer.Stop()
e.stopTimerCh(id)
}()

select {
case <-timer.C:
runOnLoop(func() error {
return e.call(callback, args)
})
case <-stopCh:
runOnLoop(noop)
case <-e.vu.Context().Done():
e.vu.State().Logger.Warnf("setTimeout %d was stopped because the VU iteration was interrupted", id)
runOnLoop(noop)
delete(e.timers, int(id))
var i, otherID int
var found bool
for i, otherID = range e.timersQueue {
if id == uint32(otherID) {
found = true
break
}
}()
}
if !found {
return
}

e.timersQueue = append(e.timersQueue[:i], e.timersQueue[i+1:]...)
e.tasks = append(e.tasks[:i], e.tasks[i+1:]...)
// no need to touch the timer - if it was for this it will just do nothing and if it wasn't it will just skip it
}

func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
id := e.nextID()
e.timerInitialization(callback, delay, args, true, int(id))
return id
}

func (e *Timers) clearTimeout(id uint32) {
e.stopTimerCh(id)
func (e *Timers) clearInterval(id uint32) {
e.clearTimeout(id)
}

func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
tq := taskqueue.New(e.vu.RegisterCallback)
id, stopCh := e.getTimerStopCh()

go func() {
ticker := time.NewTicker(time.Duration(delay * float64(time.Millisecond)))
defer func() {
e.stopTimerCh(id)
ticker.Stop()
}()

for {
defer tq.Close()
select {
case <-ticker.C:
tq.Queue(func() error {
return e.call(callback, args)
})
case <-stopCh:
return
case <-e.vu.Context().Done():
e.vu.State().Logger.Warnf("setInterval %d was stopped because the VU iteration was interrupted", id)
return
}
// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#timer-initialisation-steps
// NOTE: previousId from the specification is always send and it is basically id
func (e *Timers) timerInitialization(callback goja.Callable, timeout float64, args []goja.Value, repeat bool, id int) {
// skip all the nesting stuff as we do not care about them
if timeout < 0 {
timeout = 0
}

task := func() error {
if _, exist := e.timers[id]; !exist {
return nil // 8.1
}
}()

return id
err := e.call(callback, args)

if _, exist := e.timers[id]; !exist { // 8.4
return err
}

if repeat {
e.timerInitialization(callback, timeout, args, repeat, id)
} else {
delete(e.timers, id)
}

return err
}

e.runAfterTimeout(timeout, task, id)
}

func (e *Timers) clearInterval(id uint32) {
e.stopTimerCh(id)
// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#run-steps-after-a-timeout
// Notes:
// orderingId is not really used in this case
// id is also required for us unlike how it is defined. Maybe in the future if this moves to core it will be expanded
func (e *Timers) runAfterTimeout(timeout float64, task func() error, id int) {
// TODO figure out a better name
delay := time.Duration(timeout * float64(time.Millisecond))
timer := time.Now().Add(delay)
e.timers[id] = timer

// as we have only one orderingId we have one queue
// TODO add queue type and a map of queues when/if we have more then one orderingId
var index int
// don't use range as we want to index to go over one if it needs to go to the end
for index = 0; index < len(e.timersQueue); index++ {
otherTimer := e.timers[e.timersQueue[index]]
if otherTimer.After(timer) {
break
}
}

e.timersQueue = append(e.timersQueue, 0)
copy(e.timersQueue[index+1:], e.timersQueue[index:])
e.timersQueue[index] = id

e.tasks = append(e.tasks, nil)
copy(e.tasks[index+1:], e.tasks[index:])
e.tasks[index] = task

if index != 0 {
// we are not the earliers in the queue so we can stop here
return
}
e.setupTaskTimeout()
}

func (e *Timers) runFirstTask() error {
e.runOnLoop = nil
tasksLen := len(e.tasks)
if tasksLen == 0 {
return nil // everything was cleared
}

task := e.tasks[0]
copy(e.tasks, e.tasks[1:])
e.tasks = e.tasks[:tasksLen-1]

copy(e.timersQueue, e.timersQueue[1:])
e.timersQueue = e.timersQueue[:tasksLen-1]

err := task()

if len(e.timersQueue) > 0 {
e.setupTaskTimeout()
}
return err
}

func (e *Timers) setupTaskTimeout() {
if e.headTimer != nil {
e.headTimer.Stop()
select {
case <-e.headTimer.C:
default:
}
}
delay := -time.Since(e.timers[e.timersQueue[0]])
if e.runOnLoop == nil {
e.runOnLoop = e.vu.RegisterCallback()
}
e.headTimer = time.AfterFunc(delay, func() {
e.runOnLoop(e.runFirstTask)
})
}
80 changes: 79 additions & 1 deletion timers/timers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,87 @@ func TestSetInterval(t *testing.T) {
print("outside setInterval")
`)
require.NoError(t, err)
require.Greater(t, len(log), 2)
require.Equal(t, len(log), 2)
require.Equal(t, "outside setInterval", log[0])
for i, l := range log[1:] {
require.Equal(t, "in setInterval", l, i)
}
}

func TestSetTimeoutOrder(t *testing.T) {
t.Parallel()
runtime := modulestest.NewRuntime(t)
err := runtime.SetupModuleSystem(map[string]any{"k6/x/timers": New()}, nil, nil)
require.NoError(t, err)

rt := runtime.VU.Runtime()
var log []string
require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) }))

_, err = rt.RunString(`globalThis.setTimeout = require("k6/x/timers").setTimeout;`)
require.NoError(t, err)

for i := 0; i < 100; i++ {
_, err = runtime.RunOnEventLoop(`
setTimeout((_) => print("one"), 1);
setTimeout((_) => print("two"), 1);
setTimeout((_) => print("three"), 1);
setTimeout((_) => print("last"), 10);
setTimeout((_) => print("four"), 1);
setTimeout((_) => print("five"), 1);
setTimeout((_) => print("six"), 1);
print("outside setTimeout");
`)
require.NoError(t, err)
require.Equal(t, []string{"outside setTimeout", "one", "two", "three", "four", "five", "six", "last"}, log, i)
log = log[:0]
}
}

func TestSetIntervalOrder(t *testing.T) {
t.Parallel()
runtime := modulestest.NewRuntime(t)
err := runtime.SetupModuleSystem(map[string]any{"k6/x/timers": New()}, nil, nil)
require.NoError(t, err)

rt := runtime.VU.Runtime()
var log []string
require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) }))

_, err = rt.RunString(`globalThis.setInterval = require("k6/x/timers").setInterval;`)
require.NoError(t, err)

_, err = rt.RunString(`globalThis.clearInterval = require("k6/x/timers").clearInterval;`)
require.NoError(t, err)

for i := 0; i < 100; i++ {
_, err = runtime.RunOnEventLoop(`
var one = setInterval((_) => print("one"), 1);
var two = setInterval((_) => print("two"), 1);
var last = setInterval((_) => {
print("last")
clearInterval(one);
clearInterval(two);
clearInterval(three);
clearInterval(last);
}, 4);
var three = setInterval((_) => print("three"), 1);
print("outside");
`)
require.NoError(t, err)
require.GreaterOrEqual(t, len(log), 5)
require.Equal(t, log[0], "outside")
for i := 1; i < len(log)-1; i += 3 {
switch len(log) - i {
case 2:
require.Equal(t, log[i:i+1], []string{"one"})
case 3:
require.Equal(t, log[i:i+2], []string{"one", "two"})
default:
require.Equal(t, log[i:i+3], []string{"one", "two", "three"})
}
}
require.Equal(t, log[len(log)-1], "last")
log = log[:0]
}
}

0 comments on commit d8239b9

Please sign in to comment.