Skip to content

Commit

Permalink
Add double queue for ordered thread safe system
Browse files Browse the repository at this point in the history
There are two criterias that need to be fulfilled. One is to be able to
send and receive events in the same order that were emitted. The second
criteria is that a handler must be able to receive and emit events.

To enable this we've used a double queue system where by threads can be
emitting events, which are queued up, and the emitEvent goroutines can
read events from a different queue without deadlocking.

When the queue that emitEvent goroutines use is depleted, the queues
are swapped around so that the emitters are now filling in the empty
queue and the emitEvent goroutines are reading from the filled queue.
  • Loading branch information
ankur22 committed Oct 5, 2022
1 parent 8de1ae5 commit b28c3ed
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 17 deletions.
42 changes: 25 additions & 17 deletions common/event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ type NavigationEvent struct {
}

type eventHandler struct {
ctx context.Context
ch chan Event
queueMutex *sync.Mutex
queue []Event
ctx context.Context
ch chan Event
queueMutex *sync.Mutex
queue []Event
curQueueMutex *sync.Mutex
curQueue []Event
}

// EventEmitter that all event emitters need to implement.
Expand Down Expand Up @@ -169,12 +171,25 @@ func (e *BaseEventEmitter) sync(fn func()) {

func (e *BaseEventEmitter) emit(event string, data interface{}) {
emitEvent := func(eh *eventHandler) {
eh.queueMutex.Lock()
defer eh.queueMutex.Unlock()
eh.curQueueMutex.Lock()
defer eh.curQueueMutex.Unlock()

// We try to read from the current queue (curQueue)
// If there isn't anything on curQueue then there must
// be something being populated by the synched emitTo
// func below. Swap around curQueue with queue. Queue
// is now being populated again by emitTo, and all
// emitEvent goroutines can continue to consume from
// curQueue until that is again depleted.
if len(eh.curQueue) == 0 {
eh.queueMutex.Lock()
eh.curQueue, eh.queue = eh.queue, eh.curQueue
eh.queueMutex.Unlock()
}

select {
case eh.ch <- eh.queue[0]:
eh.queue = eh.queue[1:]
case eh.ch <- eh.curQueue[0]:
eh.curQueue = eh.curQueue[1:]
case <-eh.ctx.Done():
// TODO: handle the error
}
Expand All @@ -187,13 +202,6 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) {
handlers = append(handlers[:i], handlers[i+1:]...)
continue
default:
// To ensure that goroutines don't break the ordering
// of the emitted events, we will need the goroutine to synchronize.
// This means that the events need to be stored in a queue per handler. The
// goroutine responsible for the the first popped element must complete
// publishing the event before the next goroutine can pop and work with
// the next event for that one handler. Each handler can process events concurrently still.

handler.queueMutex.Lock()
handler.queue = append(handler.queue, Event{typ: event, data: data})
handler.queueMutex.Unlock()
Expand All @@ -218,7 +226,7 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even
if !ok {
e.handlers[event] = make([]*eventHandler, 0)
}
eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}
eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0), &sync.Mutex{}, make([]Event, 0)}
e.handlers[event] = append(e.handlers[event], &eh)
}
})
Expand All @@ -227,6 +235,6 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even
// OnAll registers a handler for all events.
func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) {
e.sync(func() {
e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)})
e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0), &sync.Mutex{}, make([]Event, 0)})
})
}
126 changes: 126 additions & 0 deletions common/event_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ package common

import (
"context"
"sync"
"testing"

"github.com/chromedp/cdproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -125,3 +127,127 @@ func TestEventEmitterAllEvents(t *testing.T) {
})
})
}

func TestBaseEventEmitter(t *testing.T) {
t.Run("order of emitted events kept", func(t *testing.T) {
// Test description
//
// 1. Emit many events from the emitWorker.
// 2. Handler receives the emitted events.
//
// Success criteria: Ensure that the ordering of events is
// received in the order they're emitted.

t.Parallel()

eventName := "AtomicIntEvent"
maxInt := 100

ctx, cancel := context.WithCancel(context.Background())
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event)
emitter.on(ctx, []string{eventName}, ch)

wg := sync.WaitGroup{}

var expectedI int
wg.Add(1)
handler := func() {
defer wg.Done()

for expectedI != maxInt {
e := <-ch

i := e.data.(int)

assert.Equal(t, eventName, e.typ)
assert.Equal(t, expectedI, i)

expectedI++
}

cancel()
close(ch)
}
go handler()

wg.Add(1)
emitWorker := func() {
defer wg.Done()

for i := 0; i < maxInt; i++ {
i := i
emitter.emit(eventName, i)
}
}
go emitWorker()

wg.Wait()
})

t.Run("handler can emit without deadlocking", func(t *testing.T) {
// Test description
//
// 1. Emit many events from the emitWorker.
// 2. Handler receives emitted events (AtomicIntEvent1).
// 3. Handler emits event as AtomicIntEvent2.
// 4. Handler received emitted events again (AtomicIntEvent2).
//
// Success criteria: No deadlock should occur between receiving,
// emitting, and receiving of events.

t.Parallel()

eventName1 := "AtomicIntEvent1"
eventName2 := "AtomicIntEvent2"
maxInt := 100

ctx, cancel := context.WithCancel(context.Background())
emitter := NewBaseEventEmitter(ctx)
ch := make(chan Event)
emitter.on(ctx, []string{eventName1, eventName2}, ch)

wg := sync.WaitGroup{}

var expectedI2 int
wg.Add(1)
handler := func() {
defer wg.Done()

for {
if expectedI2 == maxInt {
break
}

e := <-ch

switch e.typ {
case eventName1:
i := e.data.(int)
emitter.emit(eventName2, i)
case eventName2:
expectedI2++
default:
assert.Fail(t, "unexpected event type received")
}
}

cancel()
close(ch)
}
go handler()

wg.Add(1)
emitWorker := func() {
defer wg.Done()

for i := 0; i < maxInt; i++ {
i := i
emitter.emit(eventName1, i)
}
}
go emitWorker()

wg.Wait()
})
}

0 comments on commit b28c3ed

Please sign in to comment.