From 01c4f1380669641e86eae86514f941a88a90025e Mon Sep 17 00:00:00 2001 From: nbajaj90 <96036780+nbajaj90@users.noreply.github.com> Date: Thu, 3 Nov 2022 18:45:22 +0530 Subject: [PATCH] Patch sarama_kafka rebalance fix Issue: https://github.com/cloudevents/sdk-go/issues/817 Issue Explanation: https://github.com/Shopify/sarama/issues/2118 Fix reference: https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177 Signed-off-by: nbajaj90 --- protocol/kafka_sarama/v2/receiver.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/protocol/kafka_sarama/v2/receiver.go b/protocol/kafka_sarama/v2/receiver.go index 13d6b810f..b46985618 100644 --- a/protocol/kafka_sarama/v2/receiver.go +++ b/protocol/kafka_sarama/v2/receiver.go @@ -53,19 +53,25 @@ func (r *Receiver) Close(context.Context) error { } func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - for message := range claim.Messages() { - msg := message - m := NewMessageFromConsumerMessage(msg) - - r.incoming <- msgErr{ - msg: binding.WithFinish(m, func(err error) { - if protocol.IsACK(err) { - session.MarkMessage(msg, "") - } - }), + for { + select { + case msg, ok := <-claim.Messages(): + if !ok { + return nil + } + m := NewMessageFromConsumerMessage(msg) + + r.incoming <- msgErr{ + msg: binding.WithFinish(m, func(err error) { + if protocol.IsACK(err) { + session.MarkMessage(msg, "") + } + }), + } + case <-session.Context().Done(): + return nil } } - return nil } func (r *Receiver) Receive(ctx context.Context) (binding.Message, error) {