Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Commit

Permalink
Rewrite timers to be more specification compliant (#7)
Browse files Browse the repository at this point in the history
* Rewrite timers to be more specification compliant

This is more or less a complete rewrite following the specification
including leaving links to relevant parts and comments on why we
diverge.

Among other changes are:
1. timeout/intervals that triggered but weren't executed before they
   were cleared are *not* executed.
2. setInterval gets rescheduled after it's execution as per the
   specification.
3. setTimeout/setInterval triggers are ordered based on their invocation
   as well as the timeout. Again as per the specification.
4. use uint64 for timer IDs so we can go above 4 billion 🚀 

Fixes #3

---------

Co-authored-by: Oleg Bespalov <oleg.bespalov@grafana.com>
mstoykov and olegbespalov authored Dec 14, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent abc1b57 commit 562ce46
Showing 2 changed files with 279 additions and 83 deletions.
282 changes: 200 additions & 82 deletions timers/timers.go
Original file line number Diff line number Diff line change
@@ -2,12 +2,11 @@
package timers

import (
"sync"
"sync/atomic"
"time"

"github.com/dop251/goja"
"github.com/mstoykov/k6-taskqueue-lib/taskqueue"

"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
)

@@ -19,9 +18,16 @@ type RootModule struct{}
type Timers struct {
vu modules.VU

timerStopCounter uint32
timerStopsLock sync.Mutex
timerStops map[uint32]chan struct{}
timerIDCounter uint64

timers map[uint64]time.Time
// Maybe in the future if this moves to core it will be expanded to have multiple queues
queue *timerQueue

// this used predominantly to get around very unlikely race conditions as we are adding stuff to the event loop
// from outside of it on multitple timers. And it is easier to just use this then redo half the work it does
// to make that safe
taskQueue *taskqueue.TaskQueue
}

var (
@@ -38,8 +44,9 @@ func New() *RootModule {
// a new instance for each VU.
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return &Timers{
vu: vu,
timerStops: make(map[uint32]chan struct{}),
vu: vu,
timers: make(map[uint64]time.Time),
queue: new(timerQueue),
}
}

@@ -55,100 +62,211 @@ func (e *Timers) Exports() modules.Exports {
}
}

func noop() error { return nil }
func (e *Timers) nextID() uint64 {
e.timerIDCounter++
return e.timerIDCounter
}

func (e *Timers) call(callback goja.Callable, args []goja.Value) error {
// TODO: investigate, not sure GlobalObject() is always the correct value for `this`?
_, err := callback(e.vu.Runtime().GlobalObject(), args...)
return err
}

func (e *Timers) getTimerStopCh() (uint32, chan struct{}) {
id := atomic.AddUint32(&e.timerStopCounter, 1)
ch := make(chan struct{})
e.timerStopsLock.Lock()
e.timerStops[id] = ch
e.timerStopsLock.Unlock()
return id, ch
func (e *Timers) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint64 {
id := e.nextID()
e.timerInitialization(callback, delay, args, false, id)
return id
}

func (e *Timers) stopTimerCh(id uint32) bool { //nolint:unparam
e.timerStopsLock.Lock()
defer e.timerStopsLock.Unlock()
ch, ok := e.timerStops[id]
if !ok {
return false
func (e *Timers) clearTimeout(id uint64) {
_, exists := e.timers[id]
if !exists {
return
}
delete(e.timerStops, id)
close(ch)
return true
delete(e.timers, id)

e.queue.remove(id)
e.freeEventLoopIfPossible()
}

func (e *Timers) call(callback goja.Callable, args []goja.Value) error {
// TODO: investigate, not sure GlobalObject() is always the correct value for `this`?
_, err := callback(e.vu.Runtime().GlobalObject(), args...)
func (e *Timers) freeEventLoopIfPossible() {
if e.queue.length() == 0 && e.taskQueue != nil {
e.taskQueue.Close()
e.taskQueue = nil
}
}

func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint64 {
id := e.nextID()
e.timerInitialization(callback, delay, args, true, id)
return id
}

func (e *Timers) clearInterval(id uint64) {
e.clearTimeout(id)
}

// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#timer-initialisation-steps
// NOTE: previousId from the specification is always send and it is basically id
func (e *Timers) timerInitialization(
callback goja.Callable, timeout float64, args []goja.Value, repeat bool, id uint64,
) {
// skip all the nesting stuff as we do not care about them
if timeout < 0 {
timeout = 0
}

task := func() error {
// Specification 8.1: If id does not exist in global's map of active timers, then abort these steps.
if _, exist := e.timers[id]; !exist {
return nil
}

err := e.call(callback, args)

if _, exist := e.timers[id]; !exist { // 8.4
return err
}

if repeat {
e.timerInitialization(callback, timeout, args, repeat, id)
} else {
delete(e.timers, id)
}

return err
}

e.runAfterTimeout(timeout, task, id)
}

// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#run-steps-after-a-timeout
// Notes:
// orderingId is not really used in this case
// id is also required for us unlike how it is defined. Maybe in the future if this moves to core it will be expanded
func (e *Timers) runAfterTimeout(timeout float64, task func() error, id uint64) {
delay := time.Duration(timeout * float64(time.Millisecond))
triggerTime := time.Now().Add(delay)
e.timers[id] = triggerTime

// as we have only one orderingId we have one queue
index := e.queue.add(&timer{
id: id,
task: task,
nextTrigger: triggerTime,
})

if index != 0 {
return // not a timer at the very beginning
}

e.setupTaskTimeout()
}

func (e *Timers) runFirstTask() error {
t := e.queue.pop()
if t == nil {
return nil // everything was cleared
}

err := t.task()

if e.queue.length() > 0 {
e.setupTaskTimeout()
} else {
e.freeEventLoopIfPossible()
}

return err
}

func (e *Timers) setTimeout(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
runOnLoop := e.vu.RegisterCallback()
id, stopCh := e.getTimerStopCh()
func (e *Timers) setupTaskTimeout() {
e.queue.stopTimer()
delay := -time.Since(e.timers[e.queue.first().id])
if e.taskQueue == nil {
e.taskQueue = taskqueue.New(e.vu.RegisterCallback)
}
q := e.taskQueue
e.queue.head = time.AfterFunc(delay, func() {
q.Queue(e.runFirstTask)
})
}

// this is just a small struct to keep the internals of a timer
type timer struct {
id uint64
nextTrigger time.Time
task func() error
}

// this is just a list of timers that should be ordered once after the other
// this mostly just has methods to work on the slice
type timerQueue struct {
queue []*timer
head *time.Timer
}

if delay < 0 {
delay = 0
func (tq *timerQueue) add(t *timer) int {
var i int
// don't use range as we want to index to go over one if it needs to go to the end
for ; i < len(tq.queue); i++ {
if tq.queue[i].nextTrigger.After(t.nextTrigger) {
break
}
}

go func() {
timer := time.NewTimer(time.Duration(delay * float64(time.Millisecond)))
defer func() {
timer.Stop()
e.stopTimerCh(id)
}()
tq.queue = append(tq.queue, nil)
copy(tq.queue[i+1:], tq.queue[i:])
tq.queue[i] = t
return i
}

func (tq *timerQueue) stopTimer() {
if tq.head != nil && tq.head.Stop() { // we have a timer and we stopped it before it was over.
select {
case <-timer.C:
runOnLoop(func() error {
return e.call(callback, args)
})
case <-stopCh:
runOnLoop(noop)
case <-e.vu.Context().Done():
e.vu.State().Logger.Warnf("setTimeout %d was stopped because the VU iteration was interrupted", id)
runOnLoop(noop)
case <-tq.head.C:
default:
}
}()
}
}

return id
func (tq *timerQueue) remove(id uint64) {
i := tq.findIndex(id)
if i == -1 {
return
}

tq.queue = append(tq.queue[:i], tq.queue[i+1:]...)
}

func (e *Timers) clearTimeout(id uint32) {
e.stopTimerCh(id)
}

func (e *Timers) setInterval(callback goja.Callable, delay float64, args ...goja.Value) uint32 {
tq := taskqueue.New(e.vu.RegisterCallback)
id, stopCh := e.getTimerStopCh()

go func() {
ticker := time.NewTicker(time.Duration(delay * float64(time.Millisecond)))
defer func() {
e.stopTimerCh(id)
ticker.Stop()
}()

for {
defer tq.Close()
select {
case <-ticker.C:
tq.Queue(func() error {
return e.call(callback, args)
})
case <-stopCh:
return
case <-e.vu.Context().Done():
e.vu.State().Logger.Warnf("setInterval %d was stopped because the VU iteration was interrupted", id)
return
}
func (tq *timerQueue) findIndex(id uint64) int {
for i, timer := range tq.queue {
if id == timer.id {
return i
}
}()
}
return -1
}

return id
func (tq *timerQueue) pop() *timer {
length := len(tq.queue)
if length == 0 {
return nil
}
t := tq.queue[0]
copy(tq.queue, tq.queue[1:])
tq.queue = tq.queue[:length-1]
return t
}

func (tq *timerQueue) length() int {
return len(tq.queue)
}

func (e *Timers) clearInterval(id uint32) {
e.stopTimerCh(id)
func (tq *timerQueue) first() *timer {
if tq.length() == 0 {
return nil
}
return tq.queue[0]
}
80 changes: 79 additions & 1 deletion timers/timers_test.go
Original file line number Diff line number Diff line change
@@ -54,9 +54,87 @@ func TestSetInterval(t *testing.T) {
print("outside setInterval")
`)
require.NoError(t, err)
require.Greater(t, len(log), 2)
require.Equal(t, len(log), 2)
require.Equal(t, "outside setInterval", log[0])
for i, l := range log[1:] {
require.Equal(t, "in setInterval", l, i)
}
}

func TestSetTimeoutOrder(t *testing.T) {
t.Parallel()
runtime := modulestest.NewRuntime(t)
err := runtime.SetupModuleSystem(map[string]any{"k6/x/timers": New()}, nil, nil)
require.NoError(t, err)

rt := runtime.VU.Runtime()
var log []string
require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) }))

_, err = rt.RunString(`globalThis.setTimeout = require("k6/x/timers").setTimeout;`)
require.NoError(t, err)

for i := 0; i < 100; i++ {
_, err = runtime.RunOnEventLoop(`
setTimeout((_) => print("one"), 1);
setTimeout((_) => print("two"), 1);
setTimeout((_) => print("three"), 1);
setTimeout((_) => print("last"), 10);
setTimeout((_) => print("four"), 1);
setTimeout((_) => print("five"), 1);
setTimeout((_) => print("six"), 1);
print("outside setTimeout");
`)
require.NoError(t, err)
require.Equal(t, []string{"outside setTimeout", "one", "two", "three", "four", "five", "six", "last"}, log, i)
log = log[:0]
}
}

func TestSetIntervalOrder(t *testing.T) {
t.Parallel()
runtime := modulestest.NewRuntime(t)
err := runtime.SetupModuleSystem(map[string]any{"k6/x/timers": New()}, nil, nil)
require.NoError(t, err)

rt := runtime.VU.Runtime()
var log []string
require.NoError(t, rt.Set("print", func(s string) { log = append(log, s) }))

_, err = rt.RunString(`globalThis.setInterval = require("k6/x/timers").setInterval;`)
require.NoError(t, err)

_, err = rt.RunString(`globalThis.clearInterval = require("k6/x/timers").clearInterval;`)
require.NoError(t, err)

for i := 0; i < 100; i++ {
_, err = runtime.RunOnEventLoop(`
var one = setInterval((_) => print("one"), 1);
var two = setInterval((_) => print("two"), 1);
var last = setInterval((_) => {
print("last")
clearInterval(one);
clearInterval(two);
clearInterval(three);
clearInterval(last);
}, 4);
var three = setInterval((_) => print("three"), 1);
print("outside");
`)
require.NoError(t, err)
require.GreaterOrEqual(t, len(log), 5)
require.Equal(t, log[0], "outside")
for i := 1; i < len(log)-1; i += 3 {
switch len(log) - i {
case 2:
require.Equal(t, log[i:i+1], []string{"one"})
case 3:
require.Equal(t, log[i:i+2], []string{"one", "two"})
default:
require.Equal(t, log[i:i+3], []string{"one", "two", "three"})
}
}
require.Equal(t, log[len(log)-1], "last")
log = log[:0]
}
}

0 comments on commit 562ce46

Please sign in to comment.