diff --git a/common/event_emitter.go b/common/event_emitter.go index 8a7328ce9..14c03a6ac 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -98,10 +98,12 @@ type NavigationEvent struct { } type eventHandler struct { - ctx context.Context - ch chan Event - queueMutex *sync.Mutex - queue []Event + ctx context.Context + ch chan Event + queueMutex *sync.Mutex + queue []Event + curQueueMutex *sync.Mutex + curQueue []Event } // EventEmitter that all event emitters need to implement. @@ -169,12 +171,25 @@ func (e *BaseEventEmitter) sync(fn func()) { func (e *BaseEventEmitter) emit(event string, data interface{}) { emitEvent := func(eh *eventHandler) { - eh.queueMutex.Lock() - defer eh.queueMutex.Unlock() + eh.curQueueMutex.Lock() + defer eh.curQueueMutex.Unlock() + + // We try to read from the current queue (curQueue) + // If there isn't anything on curQueue then there must + // be something being populated by the synched emitTo + // func below. Swap around curQueue with queue. Queue + // is now being populated again by emitTo, and all + // emitEvent goroutines can continue to consume from + // curQueue until that is again depleted. + if len(eh.curQueue) == 0 { + eh.queueMutex.Lock() + eh.curQueue, eh.queue = eh.queue, eh.curQueue + eh.queueMutex.Unlock() + } select { - case eh.ch <- eh.queue[0]: - eh.queue = eh.queue[1:] + case eh.ch <- eh.curQueue[0]: + eh.curQueue = eh.curQueue[1:] case <-eh.ctx.Done(): // TODO: handle the error } @@ -187,13 +202,6 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { handlers = append(handlers[:i], handlers[i+1:]...) continue default: - // To ensure that goroutines don't break the ordering - // of the emitted events, we will need the goroutine to synchronize. - // This means that the events need to be stored in a queue per handler. The - // goroutine responsible for the the first popped element must complete - // publishing the event before the next goroutine can pop and work with - // the next event for that one handler. Each handler can process events concurrently still. - handler.queueMutex.Lock() handler.queue = append(handler.queue, Event{typ: event, data: data}) handler.queueMutex.Unlock() @@ -218,7 +226,7 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even if !ok { e.handlers[event] = make([]*eventHandler, 0) } - eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)} + eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0), &sync.Mutex{}, make([]Event, 0)} e.handlers[event] = append(e.handlers[event], &eh) } }) @@ -227,6 +235,6 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}) + e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0), &sync.Mutex{}, make([]Event, 0)}) }) } diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index 2e37a8d9d..9b6796e7d 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -22,9 +22,11 @@ package common import ( "context" + "sync" "testing" "github.com/chromedp/cdproto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -125,3 +127,127 @@ func TestEventEmitterAllEvents(t *testing.T) { }) }) } + +func TestBaseEventEmitter(t *testing.T) { + t.Run("order of emitted events kept", func(t *testing.T) { + // Test description + // + // 1. Emit many events from the emitWorker. + // 2. Handler receives the emitted events. + // + // Success criteria: Ensure that the ordering of events is + // received in the order they're emitted. + + t.Parallel() + + eventName := "AtomicIntEvent" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName}, ch) + + wg := sync.WaitGroup{} + + var expectedI int + wg.Add(1) + handler := func() { + defer wg.Done() + + for expectedI != maxInt { + e := <-ch + + i := e.data.(int) + + assert.Equal(t, eventName, e.typ) + assert.Equal(t, expectedI, i) + + expectedI++ + } + + cancel() + close(ch) + } + go handler() + + wg.Add(1) + emitWorker := func() { + defer wg.Done() + + for i := 0; i < maxInt; i++ { + i := i + emitter.emit(eventName, i) + } + } + go emitWorker() + + wg.Wait() + }) + + t.Run("handler can emit without deadlocking", func(t *testing.T) { + // Test description + // + // 1. Emit many events from the emitWorker. + // 2. Handler receives emitted events (AtomicIntEvent1). + // 3. Handler emits event as AtomicIntEvent2. + // 4. Handler received emitted events again (AtomicIntEvent2). + // + // Success criteria: No deadlock should occur between receiving, + // emitting, and receiving of events. + + t.Parallel() + + eventName1 := "AtomicIntEvent1" + eventName2 := "AtomicIntEvent2" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName1, eventName2}, ch) + + wg := sync.WaitGroup{} + + var expectedI2 int + wg.Add(1) + handler := func() { + defer wg.Done() + + for { + if expectedI2 == maxInt { + break + } + + e := <-ch + + switch e.typ { + case eventName1: + i := e.data.(int) + emitter.emit(eventName2, i) + case eventName2: + expectedI2++ + default: + assert.Fail(t, "unexpected event type received") + } + } + + cancel() + close(ch) + } + go handler() + + wg.Add(1) + emitWorker := func() { + defer wg.Done() + + for i := 0; i < maxInt; i++ { + i := i + emitter.emit(eventName1, i) + } + } + go emitWorker() + + wg.Wait() + }) +}