diff --git a/blockchain/subscription.go b/blockchain/subscription.go index 34a147342f..145a727fa0 100644 --- a/blockchain/subscription.go +++ b/blockchain/subscription.go @@ -42,19 +42,7 @@ type subscription struct { // GetEventCh creates a new event channel, and returns it func (s *subscription) GetEventCh() chan *Event { - eventCh := make(chan *Event) - - go func() { - for { - evnt := s.GetEvent() - if evnt == nil { - return - } - eventCh <- evnt - } - }() - - return eventCh + return s.updateCh } // GetEvent returns the event from the subscription (BLOCKING) @@ -161,11 +149,7 @@ func (e *eventStream) newUpdateCh() chan *Event { e.Lock() defer e.Unlock() - ch := make(chan *Event, 1) - - if e.updateCh == nil { - e.updateCh = make([]chan *Event, 0) - } + ch := make(chan *Event, 5) e.updateCh = append(e.updateCh, ch) @@ -179,9 +163,6 @@ func (e *eventStream) push(event *Event) { // Notify the listeners for _, update := range e.updateCh { - select { - case update <- event: - default: - } + update <- event } } diff --git a/blockchain/subscription_test.go b/blockchain/subscription_test.go index 9e854a20a9..26acd59cf4 100644 --- a/blockchain/subscription_test.go +++ b/blockchain/subscription_test.go @@ -51,3 +51,61 @@ func TestSubscription(t *testing.T) { assert.Equal(t, event.NewChain[0].Number, caughtEventNum) } + +func TestSubscription_BufferedChannel_MultipleSubscriptions(t *testing.T) { + t.Parallel() + + var ( + e = &eventStream{} + wg sync.WaitGroup + numOfEvents = 100000 + numOfSubscriptions = 10 + ) + + subscriptions := make([]*subscription, numOfSubscriptions) + wg.Add(numOfSubscriptions) + + worker := func(id int, sub *subscription) { + updateCh := sub.GetEventCh() + caughtEvents := 0 + + defer wg.Done() + + for { + select { + case <-updateCh: + caughtEvents++ + if caughtEvents == numOfEvents { + return + } + case <-time.After(10 * time.Second): + t.Errorf("subscription %d did not caught all events", id) + } + } + } + + for i := 0; i < numOfSubscriptions; i++ { + sub := e.subscribe() + subscriptions[i] = sub + + go worker(i, sub) + } + + // Send the events to the channels + for i := 0; i < numOfEvents; i++ { + e.push(&Event{ + NewChain: []*types.Header{ + { + Number: uint64(i), + }, + }, + }) + } + + // Wait for the events to be processed + wg.Wait() + + for _, s := range subscriptions { + s.Close() + } +}