Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a queue per handler to keep events in order #555

Merged
merged 8 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions common/event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package common

import (
"context"
"sync"
)

// Ensure BaseEventEmitter implements the EventEmitter interface.
Expand Down Expand Up @@ -96,9 +97,17 @@ type NavigationEvent struct {
err error
}

type queue struct {
writeMutex sync.Mutex
write []Event
readMutex sync.Mutex
read []Event
}
inancgumus marked this conversation as resolved.
Show resolved Hide resolved

type eventHandler struct {
ctx context.Context
ch chan Event
ctx context.Context
ch chan Event
queue *queue
inancgumus marked this conversation as resolved.
Show resolved Hide resolved
}

// EventEmitter that all event emitters need to implement.
Expand All @@ -114,8 +123,10 @@ type syncFunc func() (done chan struct{})

// BaseEventEmitter emits events to registered handlers.
type BaseEventEmitter struct {
handlers map[string][]eventHandler
handlersAll []eventHandler
handlers map[string][]*eventHandler
handlersAll []*eventHandler

queues map[chan Event]*queue

syncCh chan syncFunc
ctx context.Context
Expand All @@ -124,9 +135,10 @@ type BaseEventEmitter struct {
// NewBaseEventEmitter creates a new instance of a base event emitter.
func NewBaseEventEmitter(ctx context.Context) BaseEventEmitter {
bem := BaseEventEmitter{
handlers: make(map[string][]eventHandler),
handlers: make(map[string][]*eventHandler),
syncCh: make(chan syncFunc),
ctx: ctx,
queues: make(map[chan Event]*queue),
}
go bem.syncAll(ctx)
return bem
Expand Down Expand Up @@ -165,21 +177,43 @@ func (e *BaseEventEmitter) sync(fn func()) {
}

func (e *BaseEventEmitter) emit(event string, data interface{}) {
emitEvent := func(eh eventHandler) {
emitEvent := func(eh *eventHandler) {
eh.queue.readMutex.Lock()
defer eh.queue.readMutex.Unlock()

// We try to read from the read queue (queue.read).
// If there isn't anything on the read queue, then there must
// be something being populated by the synched emitTo
// func below.
// Swap around the read queue with the write queue.
// Queue is now being populated again by emitTo, and all
// emitEvent goroutines can continue to consume from
// the read queue until that is again depleted.
if len(eh.queue.read) == 0 {
eh.queue.writeMutex.Lock()
eh.queue.read, eh.queue.write = eh.queue.write, eh.queue.read
eh.queue.writeMutex.Unlock()
}

select {
case eh.ch <- Event{event, data}:
case eh.ch <- eh.queue.read[0]:
eh.queue.read = eh.queue.read[1:]
case <-eh.ctx.Done():
// TODO: handle the error
}
}
emitTo := func(handlers []eventHandler) (updated []eventHandler) {
emitTo := func(handlers []*eventHandler) (updated []*eventHandler) {
for i := 0; i < len(handlers); {
handler := handlers[i]
select {
case <-handler.ctx.Done():
handlers = append(handlers[:i], handlers[i+1:]...)
continue
default:
handler.queue.writeMutex.Lock()
handler.queue.write = append(handler.queue.write, Event{typ: event, data: data})
handler.queue.writeMutex.Unlock()

go emitEvent(handler)
ankur22 marked this conversation as resolved.
Show resolved Hide resolved
i++
}
Expand All @@ -195,20 +229,27 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) {
// On registers a handler for a specific event.
func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Event) {
e.sync(func() {
q, ok := e.queues[ch]
if !ok {
q = &queue{}
e.queues[ch] = q
}
inancgumus marked this conversation as resolved.
Show resolved Hide resolved

for _, event := range events {
_, ok := e.handlers[event]
if !ok {
e.handlers[event] = make([]eventHandler, 0)
}
eh := eventHandler{ctx, ch}
e.handlers[event] = append(e.handlers[event], eh)
e.handlers[event] = append(e.handlers[event], &eventHandler{ctx: ctx, ch: ch, queue: q})
}
})
}

// OnAll registers a handler for all events.
func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) {
e.sync(func() {
e.handlersAll = append(e.handlersAll, eventHandler{ctx, ch})
q, ok := e.queues[ch]
if !ok {
q = &queue{}
e.queues[ch] = q
}

e.handlersAll = append(e.handlersAll, &eventHandler{ctx: ctx, ch: ch, queue: q})
})
}
186 changes: 186 additions & 0 deletions common/event_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ package common
import (
"context"
"testing"
"time"

"github.com/chromedp/cdproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -125,3 +127,187 @@ func TestEventEmitterAllEvents(t *testing.T) {
})
})
}

//nolint:gocognit
func TestBaseEventEmitter(t *testing.T) {
t.Parallel()

t.Run("order of emitted events kept", func(t *testing.T) {
// Test description
//
// 1. Emit many events from the emitWorker.
// 2. Handler receives the emitted events.
//
// Success criteria: Ensure that the ordering of events is
// received in the order they're emitted.

t.Parallel()

eventName := "AtomicIntEvent"
maxInt := 100

ctx, cancel := context.WithCancel(context.Background())
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event)
emitter.on(ctx, []string{eventName}, ch)

var expectedI int
handler := func() {
defer cancel()

for expectedI != maxInt {
e := <-ch

i, ok := e.data.(int)
if !ok {
assert.FailNow(t, "unexpected type read from channel", e.data)
}

assert.Equal(t, eventName, e.typ)
assert.Equal(t, expectedI, i)

expectedI++
}

close(ch)
}
go handler()

emitWorker := func() {
for i := 0; i < maxInt; i++ {
emitter.emit(eventName, i)
}
}
go emitWorker()

select {
case <-ctx.Done():
case <-time.After(time.Second * 2):
assert.FailNow(t, "test timed out, deadlock?")
}
})

t.Run("order of emitted different event types kept", func(t *testing.T) {
// Test description
//
// 1. Emit many different event types from the emitWorker.
// 2. Handler receives the emitted events.
//
// Success criteria: Ensure that the ordering of events is
// received in the order they're emitted.

t.Parallel()

eventName1 := "AtomicIntEvent1"
eventName2 := "AtomicIntEvent2"
eventName3 := "AtomicIntEvent3"
eventName4 := "AtomicIntEvent4"
maxInt := 100

ctx, cancel := context.WithCancel(context.Background())
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event)
// Calling on twice to ensure that the same queue is used
// internally for the same channel and handler.
emitter.on(ctx, []string{eventName1, eventName2}, ch)
emitter.on(ctx, []string{eventName3, eventName4}, ch)

var expectedI int
handler := func() {
defer cancel()

for expectedI != maxInt {
e := <-ch

i, ok := e.data.(int)
if !ok {
assert.FailNow(t, "unexpected type read from channel", e.data)
}

assert.Equal(t, expectedI, i)

expectedI++
}

close(ch)
}
go handler()

emitWorker := func() {
for i := 0; i < maxInt; i += 4 {
emitter.emit(eventName1, i)
emitter.emit(eventName2, i+1)
emitter.emit(eventName3, i+2)
emitter.emit(eventName4, i+3)
}
}
go emitWorker()

select {
case <-ctx.Done():
case <-time.After(time.Second * 2):
assert.FailNow(t, "test timed out, deadlock?")
}
})

t.Run("handler can emit without deadlocking", func(t *testing.T) {
// Test description
//
// 1. Emit many events from the emitWorker.
// 2. Handler receives emitted events (AtomicIntEvent1).
// 3. Handler emits event as AtomicIntEvent2.
// 4. Handler received emitted events again (AtomicIntEvent2).
//
// Success criteria: No deadlock should occur between receiving,
// emitting, and receiving of events.

t.Parallel()

eventName1 := "AtomicIntEvent1"
eventName2 := "AtomicIntEvent2"
maxInt := 100

ctx, cancel := context.WithCancel(context.Background())
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event)
emitter.on(ctx, []string{eventName1, eventName2}, ch)

var expectedI2 int
handler := func() {
defer cancel()

for expectedI2 != maxInt {
e := <-ch

switch e.typ {
case eventName1:
i, ok := e.data.(int)
if !ok {
assert.FailNow(t, "unexpected type read from channel", e.data)
}
emitter.emit(eventName2, i)
case eventName2:
expectedI2++
default:
assert.FailNow(t, "unexpected event type received")
}
}

close(ch)
}
go handler()

emitWorker := func() {
for i := 0; i < maxInt; i++ {
emitter.emit(eventName1, i)
}
}
go emitWorker()

select {
case <-ctx.Done():
case <-time.After(time.Second * 2):
assert.FailNow(t, "test timed out, deadlock?")
}
})
}