Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After a rebalance, the receiver keeps receiving messages until it errors out #817

Closed
nbajaj90 opened this issue Nov 2, 2022 · 1 comment

Comments

@nbajaj90
Copy link
Contributor

nbajaj90 commented Nov 2, 2022

After a rebalance, the receiver keeps receiving messages until its error out, and most of these messages fails to commit. Do we need to fix the ConsumeClaim to return once the consumer session ends (because of rebalance), so that the receiver won't keep receiving messages after rebalance.

To fix here:

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

References:
IBM/sarama#2118
https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go

@nbajaj90 nbajaj90 changed the title After a rebalance, the receiver keeps receiving messages until its error out, and most of these messages fails to commit. Do we need to fix the ConsumeClaim to return once the consumer session ends (because of rebalance), so that the receiver won't keep receiving messages after rebalance. After a rebalance, the receiver keeps receiving messages until its error out Nov 2, 2022
@nbajaj90 nbajaj90 changed the title After a rebalance, the receiver keeps receiving messages until its error out After a rebalance, the receiver keeps receiving messages until it errors out Nov 2, 2022
@nbajaj90
Copy link
Contributor Author

nbajaj90 commented Nov 2, 2022

Expecting something like:

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case msg, ok := <-claim.Messages():
			if !ok {
				return nil
			}
			m := NewMessageFromConsumerMessage(msg)

			r.incoming <- msgErr{
				msg: binding.WithFinish(m, func(err error) {
					if protocol.IsACK(err) {
						session.MarkMessage(msg, "")
					}
				}),
			}
		case <-session.Context().Done():
			return nil
		}
	}
}

duglin pushed a commit that referenced this issue Nov 10, 2022
* Patch sarama_kafka rebalance fix

Issue: #817
Issue Explanation: IBM/sarama#2118
Fix reference: https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177

Signed-off-by: nbajaj90 <[email protected]>

* Adding comment as per code review comment

Signed-off-by: nbajaj90 <[email protected]>

* Fixing typo

Signed-off-by: nbajaj90 <[email protected]>

* Incorporated review comment to move comment

Signed-off-by: nbajaj90 <[email protected]>

Signed-off-by: nbajaj90 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant