Skip to content

Commit

Permalink
Fix a blocking subscriber prevents other subscriber from receiving a …
Browse files Browse the repository at this point in the history
…message (#256)

A gochannel subscriber could prevent subscribers which were registered
afterwards from receiving a message.

Steps to reproduce:
1. Create a subscriber A subscribing to topic T.
2. Create a subscriber B subscribing to topic T.
3. Send a message on topic T.
4. Subscriber B will never receive a message as long as subscriber A
   doesn't call ack.

This pull request fixes this behaviour by distributing the messages
concurrently.

Fixes #247.
  • Loading branch information
boreq authored Jan 26, 2022
1 parent 54d4b23 commit 0a6ea1b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,19 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan
}

go func(subscribers []*subscriber) {
wg := &sync.WaitGroup{}

for i := range subscribers {
subscriber := subscribers[i]
subscriber.sendMessageToSubscriber(message, logFields)

wg.Add(1)
go func() {
subscriber.sendMessageToSubscriber(message, logFields)
wg.Done()
}()
}

wg.Wait()
close(ackedBySubscribers)
}(subscribers)

Expand Down
38 changes: 38 additions & 0 deletions pubsub/gochannel/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,44 @@ func TestPublish_race_condition_when_closing(t *testing.T) {
}
}

func TestPublishSubscribe_do_not_block_other_subscribers(t *testing.T) {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgsFromSubscriber1, err := pubSub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

_, err = pubSub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

msgsFromSubscriber3, err := pubSub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

err = pubSub.Publish(topicName, message.NewMessage("1", nil))
require.NoError(t, err)

received := make(chan struct{})
go func() {
msg := <-msgsFromSubscriber1
msg.Ack()

msg = <-msgsFromSubscriber3
msg.Ack()

close(received)
}()

select {
case <-received:
// ok
case <-time.After(5 * time.Second):
t.Fatal("subscriber which didn't ack a message blocked other subscribers from receiving it")
}
}

func testPublishSubscribeSubRace(t *testing.T) {
t.Helper()

Expand Down

0 comments on commit 0a6ea1b

Please sign in to comment.