From 0a6ea1b5b4e710a0016dcea4fe01f5d7a2073461 Mon Sep 17 00:00:00 2001 From: Filip Borkiewicz Date: Wed, 26 Jan 2022 21:09:21 +0100 Subject: [PATCH] Fix a blocking subscriber prevents other subscriber from receiving a message (#256) A gochannel subscriber could prevent subscribers which were registered afterwards from receiving a message. Steps to reproduce: 1. Create a subscriber A subscribing to topic T. 2. Create a subscriber B subscribing to topic T. 3. Send a message on topic T. 4. Subscriber B will never receive a message as long as subscriber A doesn't call ack. This pull request fixes this behaviour by distributing the messages concurrently. Fixes #247. --- pubsub/gochannel/pubsub.go | 11 +++++++++- pubsub/gochannel/pubsub_test.go | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index d268f7c47..da9696624 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -147,10 +147,19 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan } go func(subscribers []*subscriber) { + wg := &sync.WaitGroup{} + for i := range subscribers { subscriber := subscribers[i] - subscriber.sendMessageToSubscriber(message, logFields) + + wg.Add(1) + go func() { + subscriber.sendMessageToSubscriber(message, logFields) + wg.Done() + }() } + + wg.Wait() close(ackedBySubscribers) }(subscribers) diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 2a6d47493..ca84f361e 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -168,6 +168,44 @@ func TestPublish_race_condition_when_closing(t *testing.T) { } } +func TestPublishSubscribe_do_not_block_other_subscribers(t *testing.T) { + pubSub := gochannel.NewGoChannel( + gochannel.Config{}, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgsFromSubscriber1, err := pubSub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + _, err = pubSub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + msgsFromSubscriber3, err := pubSub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + err = pubSub.Publish(topicName, message.NewMessage("1", nil)) + require.NoError(t, err) + + received := make(chan struct{}) + go func() { + msg := <-msgsFromSubscriber1 + msg.Ack() + + msg = <-msgsFromSubscriber3 + msg.Ack() + + close(received) + }() + + select { + case <-received: + // ok + case <-time.After(5 * time.Second): + t.Fatal("subscriber which didn't ack a message blocked other subscribers from receiving it") + } +} + func testPublishSubscribeSubRace(t *testing.T) { t.Helper()