diff --git a/pubsub/gochannel/pubsub.go b/pubsub/gochannel/pubsub.go index d268f7c47..da9696624 100644 --- a/pubsub/gochannel/pubsub.go +++ b/pubsub/gochannel/pubsub.go @@ -147,10 +147,19 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan } go func(subscribers []*subscriber) { + wg := &sync.WaitGroup{} + for i := range subscribers { subscriber := subscribers[i] - subscriber.sendMessageToSubscriber(message, logFields) + + wg.Add(1) + go func() { + subscriber.sendMessageToSubscriber(message, logFields) + wg.Done() + }() } + + wg.Wait() close(ackedBySubscribers) }(subscribers) diff --git a/pubsub/gochannel/pubsub_test.go b/pubsub/gochannel/pubsub_test.go index 2a6d47493..ca84f361e 100644 --- a/pubsub/gochannel/pubsub_test.go +++ b/pubsub/gochannel/pubsub_test.go @@ -168,6 +168,44 @@ func TestPublish_race_condition_when_closing(t *testing.T) { } } +func TestPublishSubscribe_do_not_block_other_subscribers(t *testing.T) { + pubSub := gochannel.NewGoChannel( + gochannel.Config{}, + watermill.NewStdLogger(true, true), + ) + topicName := "test_topic_" + watermill.NewUUID() + + msgsFromSubscriber1, err := pubSub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + _, err = pubSub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + msgsFromSubscriber3, err := pubSub.Subscribe(context.Background(), topicName) + require.NoError(t, err) + + err = pubSub.Publish(topicName, message.NewMessage("1", nil)) + require.NoError(t, err) + + received := make(chan struct{}) + go func() { + msg := <-msgsFromSubscriber1 + msg.Ack() + + msg = <-msgsFromSubscriber3 + msg.Ack() + + close(received) + }() + + select { + case <-received: + // ok + case <-time.After(5 * time.Second): + t.Fatal("subscriber which didn't ack a message blocked other subscribers from receiving it") + } +} + func testPublishSubscribeSubRace(t *testing.T) { t.Helper()