From b28c3ed1cdb478af4eff898bea0414054f9c67f5 Mon Sep 17 00:00:00 2001 From: ankur22 Date: Wed, 5 Oct 2022 15:52:09 +0100 Subject: [PATCH] Add double queue for ordered thread safe system There are two criterias that need to be fulfilled. One is to be able to send and receive events in the same order that were emitted. The second criteria is that a handler must be able to receive and emit events. To enable this we've used a double queue system where by threads can be emitting events, which are queued up, and the emitEvent goroutines can read events from a different queue without deadlocking. When the queue that emitEvent goroutines use is depleted, the queues are swapped around so that the emitters are now filling in the empty queue and the emitEvent goroutines are reading from the filled queue. --- common/event_emitter.go | 42 +++++++----- common/event_emitter_test.go | 126 +++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 17 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index 8a7328ce9..14c03a6ac 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -98,10 +98,12 @@ type NavigationEvent struct { } type eventHandler struct { - ctx context.Context - ch chan Event - queueMutex *sync.Mutex - queue []Event + ctx context.Context + ch chan Event + queueMutex *sync.Mutex + queue []Event + curQueueMutex *sync.Mutex + curQueue []Event } // EventEmitter that all event emitters need to implement. @@ -169,12 +171,25 @@ func (e *BaseEventEmitter) sync(fn func()) { func (e *BaseEventEmitter) emit(event string, data interface{}) { emitEvent := func(eh *eventHandler) { - eh.queueMutex.Lock() - defer eh.queueMutex.Unlock() + eh.curQueueMutex.Lock() + defer eh.curQueueMutex.Unlock() + + // We try to read from the current queue (curQueue) + // If there isn't anything on curQueue then there must + // be something being populated by the synched emitTo + // func below. Swap around curQueue with queue. Queue + // is now being populated again by emitTo, and all + // emitEvent goroutines can continue to consume from + // curQueue until that is again depleted. + if len(eh.curQueue) == 0 { + eh.queueMutex.Lock() + eh.curQueue, eh.queue = eh.queue, eh.curQueue + eh.queueMutex.Unlock() + } select { - case eh.ch <- eh.queue[0]: - eh.queue = eh.queue[1:] + case eh.ch <- eh.curQueue[0]: + eh.curQueue = eh.curQueue[1:] case <-eh.ctx.Done(): // TODO: handle the error } @@ -187,13 +202,6 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { handlers = append(handlers[:i], handlers[i+1:]...) continue default: - // To ensure that goroutines don't break the ordering - // of the emitted events, we will need the goroutine to synchronize. - // This means that the events need to be stored in a queue per handler. The - // goroutine responsible for the the first popped element must complete - // publishing the event before the next goroutine can pop and work with - // the next event for that one handler. Each handler can process events concurrently still. - handler.queueMutex.Lock() handler.queue = append(handler.queue, Event{typ: event, data: data}) handler.queueMutex.Unlock() @@ -218,7 +226,7 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even if !ok { e.handlers[event] = make([]*eventHandler, 0) } - eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)} + eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0), &sync.Mutex{}, make([]Event, 0)} e.handlers[event] = append(e.handlers[event], &eh) } }) @@ -227,6 +235,6 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}) + e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0), &sync.Mutex{}, make([]Event, 0)}) }) } diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index 2e37a8d9d..9b6796e7d 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -22,9 +22,11 @@ package common import ( "context" + "sync" "testing" "github.com/chromedp/cdproto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -125,3 +127,127 @@ func TestEventEmitterAllEvents(t *testing.T) { }) }) } + +func TestBaseEventEmitter(t *testing.T) { + t.Run("order of emitted events kept", func(t *testing.T) { + // Test description + // + // 1. Emit many events from the emitWorker. + // 2. Handler receives the emitted events. + // + // Success criteria: Ensure that the ordering of events is + // received in the order they're emitted. + + t.Parallel() + + eventName := "AtomicIntEvent" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName}, ch) + + wg := sync.WaitGroup{} + + var expectedI int + wg.Add(1) + handler := func() { + defer wg.Done() + + for expectedI != maxInt { + e := <-ch + + i := e.data.(int) + + assert.Equal(t, eventName, e.typ) + assert.Equal(t, expectedI, i) + + expectedI++ + } + + cancel() + close(ch) + } + go handler() + + wg.Add(1) + emitWorker := func() { + defer wg.Done() + + for i := 0; i < maxInt; i++ { + i := i + emitter.emit(eventName, i) + } + } + go emitWorker() + + wg.Wait() + }) + + t.Run("handler can emit without deadlocking", func(t *testing.T) { + // Test description + // + // 1. Emit many events from the emitWorker. + // 2. Handler receives emitted events (AtomicIntEvent1). + // 3. Handler emits event as AtomicIntEvent2. + // 4. Handler received emitted events again (AtomicIntEvent2). + // + // Success criteria: No deadlock should occur between receiving, + // emitting, and receiving of events. + + t.Parallel() + + eventName1 := "AtomicIntEvent1" + eventName2 := "AtomicIntEvent2" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName1, eventName2}, ch) + + wg := sync.WaitGroup{} + + var expectedI2 int + wg.Add(1) + handler := func() { + defer wg.Done() + + for { + if expectedI2 == maxInt { + break + } + + e := <-ch + + switch e.typ { + case eventName1: + i := e.data.(int) + emitter.emit(eventName2, i) + case eventName2: + expectedI2++ + default: + assert.Fail(t, "unexpected event type received") + } + } + + cancel() + close(ch) + } + go handler() + + wg.Add(1) + emitWorker := func() { + defer wg.Done() + + for i := 0; i < maxInt; i++ { + i := i + emitter.emit(eventName1, i) + } + } + go emitWorker() + + wg.Wait() + }) +}