Skip to content

Commit

Permalink
[pubsub/jetstream] Add missing concurrencyMode support for queue-base…
Browse files Browse the repository at this point in the history
…d handler

Patch for #3222

Signed-off-by: Byron Ruth <[email protected]>
  • Loading branch information
bruth committed Nov 17, 2023
1 parent 1f12557 commit 6826c2d
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions pubsub/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,21 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
}
}

// Choose the correct handler based on the concurrency model.
var concHandler nats.MsgHandler
switch js.meta.Concurrency {
case pubsub.Single:
concHandler = natsHandler
case pubsub.Parallel:
concHandler = func(msg *nats.Msg) {
js.wg.Add(1)
go func() {
natsHandler(msg)
js.wg.Done()
}()
}
}

var err error
streamName := js.meta.StreamName
if streamName == "" {
Expand All @@ -245,35 +260,19 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
return err
}
}
var subscription *nats.Subscription
var sub *nats.Subscription

consumerInfo, err := js.jsc.AddConsumer(streamName, &consumerConfig)
if err != nil {
return err
}

if queue := js.meta.QueueGroupName; queue != "" {
js.l.Debugf("nats: subscribed to subject %s with queue group %s",
req.Topic, js.meta.QueueGroupName)
subscription, err = js.jsc.QueueSubscribe(req.Topic, queue, natsHandler, nats.Bind(streamName, consumerInfo.Name))
js.l.Debugf("nats: subscribed to subject %s with queue group %s", req.Topic, js.meta.QueueGroupName)
sub, err = js.jsc.QueueSubscribe(req.Topic, queue, concHandler, nats.Bind(streamName, consumerInfo.Name))
} else {
js.l.Debugf("nats: subscribed to subject %s", req.Topic)
subscription, err = js.jsc.Subscribe(
req.Topic,
func(msg *nats.Msg) {
switch js.meta.Concurrency {
case pubsub.Single:
natsHandler(msg)
case pubsub.Parallel:
js.wg.Add(1)
go func() {
natsHandler(msg)
js.wg.Done()
}()
}
},
nats.Bind(streamName, consumerInfo.Name),
)
sub, err = js.jsc.Subscribe(req.Topic, concHandler, nats.Bind(streamName, consumerInfo.Name))
}
if err != nil {
return err
Expand All @@ -286,7 +285,7 @@ func (js *jetstreamPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRe
case <-ctx.Done():
case <-js.closeCh:
}
err := subscription.Unsubscribe()
err := sub.Unsubscribe()
if err != nil {
js.l.Warnf("nats: error while unsubscribing from topic %s: %v", req.Topic, err)
}
Expand Down

0 comments on commit 6826c2d

Please sign in to comment.