Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to commit offset during rebalancing but messages are consumed and processed #2118

Closed
shweta-fourkites opened this issue Jan 23, 2022 · 6 comments · Fixed by cloudevents/sdk-go#818

Comments

@shweta-fourkites
Copy link

shweta-fourkites commented Jan 23, 2022

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama - v1.30.0
Kafka - 2.2.1.
Go- go1.16.6

Configuration

config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
config.Consumer.MaxProcessingTime = 500 * time.Millisecond
config.Consumer.Offsets.AutoCommit.Enable = false
config.Consumer.Return.Errors = true

What configuration values are you using for Sarama and Kafka?

Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

logs: CLICK ME

2022/01/23 11:52:55.686008 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:55.981009 consumer.go:284: - INFO Processing offset 82704 from partition 2 testtopic/2/82704 testtopic/2/82704 testtopic::others::EVENTTYPE::1014
2022/01/23 11:52:55.981206 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:56.274162 consumer.go:284: - INFO Processing offset 82705 from partition 2 testtopic/2/82705 testtopic::others::EVENTTYPE::1130
2022/01/23 11:52:56.274059 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:56.572360 consumer.go:284: - INFO Processing offset 82706 from partition 2 testtopic/2/82706 testtopic::others::EVENTTYPE::1099
2022/01/23 11:52:56.572538 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:56.877602 consumer.go:284: - INFO Processing offset 82707 from partition 2
testtopic/2/82707 testtopic::others::EVENTTYPE::1133
2022/01/23 11:52:56.877766 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:57.182187 consumer.go:284: - INFO Processing offset 82708 from partition 2
testtopic/2/82708 testtopic::others::EVENTTYPE::1038
2022/01/23 11:52:57.182367 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:57.477362 consumer.go:284: - INFO Processing offset 82709 from partition 2
testtopic/2/82709 testtopic::others::EVENTTYPE::1055
2022/01/23 11:52:57.477511 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 11:52:57.781924 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server

Problem Description
  1. Messages are consumed from the partitions assigned to a consumer even during a rebalance but the offset commit after processing throws 'kafka server: The provided member is not known in the current generation.' error. When partitions are reassigned to a new consumer after it joins the group, messages are duplicated (as it was already consumed and processed but not committed)
  2. Rebalances are taking a lot of time.

why does claim.Messages() continues to return messages when there is a rebalance? How to handle this to prevent processing messages again?

@dnwe could you please help here, Thanks.

@dnwe
Copy link
Collaborator

dnwe commented Jan 23, 2022

As I remember it, as long as the consumer is a member of the group and doesn't disconnect, the generation ID shouldn't get incremented until the (re-)JoinGroup request has been sent.

How many topic partitions do you have assigned to the consumer? Each one will map to its own partition consumer which provides the claim.Messages() channel. As per the comment above the claim.Messages() channel:

	// Messages returns the read channel for the messages that are returned by
	// the broker. The messages channel will be closed when a new rebalance cycle
	// is due. You must finish processing and mark offsets within
	// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
	// re-assigned to another group member.

So with your config you should have 20 seconds from the start of the rebalance for you to drain and complete all your processing of all the messages from your assigned partitions <-claim.Messages() and your call to session.Commit() to have reached the backend. You log snippet only showed the error messages, so I couldn't see the timestamps of when the session rebalance first started.

@dnwe
Copy link
Collaborator

dnwe commented Jan 23, 2022

It would be useful if you could also add a log statement to your Cleanup(ConsumerGroupSession) error func in your ConsumerGroupHandler too as that will show when your ConsumeClaim (partition consumers) completed their processing and returned for the session to be re-started

@shweta-fourkites
Copy link
Author

@dnwe Thanks for looking into this.

logs from consumer 2 that joined the group

2022/01/23 12:54:36.091344 consumer.go:824: consumer/broker/41 added subscription to testtopic/1
2022/01/23 12:54:36.095233 consumer.go:284: - INFO Processing offset 91343 from partition 1 testtopic/1/91343 testvenkatshipper::others::EVENTTYPE::1104
2022/01/23 12:54:36.097734 consumer.go:824: consumer/broker/37 added subscription to testtopic/0
2

Consumer 1 logs around the same time:

2022/01/23 12:54:36.206173 consumer.go:284: - INFO Processing offset 84635 from partition 0 testtopic/0/84635 testvenkatshipper::others::EVENTTYPE::1156
2022/01/23 12:54:36.207008 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/0: kafka server: The provided member is not known in the current generation.
2022/01/23 12:54:36.271698 consumer.go:284: - INFO Processing offset 83705 from partition 2 testtopic/2/83705 testvenkatshipper::others::EVENTTYPE::1120
2022/01/23 12:54:36.275907 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 12:54:36.275919 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/0: kafka server: The provided member is not known in the current generation.
2022/01/23 12:54:36.296485 consumer.go:284: - Processing offset 91343 from partition 1 INFO testtopic/1/91343 testvenkatshipper::others::EVENTTYPE::1104
2022/01/23 12:54:36.298043 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/2: kafka server: The provided member is not known in the current generation.
2022/01/23 12:54:36.298053 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/0: kafka server: The provided member is not known in the current generation.
.
.
.
.
2022/01/23 12:55:05.488513 consumer.go:149: - ERROR Consumer Errors: kafka: error while consuming testtopic/0: kafka server: The provided member is not known in the current generation.
2022/01/23 12:55:56.092896 consumer.go:824: consumer/broker/43 added subscription to testtopic/2

As you see in the above logs, both consumer 1 and consumer 2 has received msg from offset 91343 from partition 1. By the time consumer 2 became active, consumer started throwing - The provided member is not known in the current generation - error message. This message is thrown when session.commit() is attempted. But claim.Messages() should not have received msg from this partition at all.

I continue to see the error in logs for almost 20 seconds.

The partitions are not revoked immediately when a new consumer joined, although commits failed and the other consumer has already started processing messages from the same partition.

This topic that I am using has 3 partitions and the consumers are added automatically based on lag.

Another issue that I see is, When a 3rd consumer is added, the 2nd consumer is rebalancing and assigned with partition 1 and 0. This third consumer with 2. The first consumer still has not got any partitions, the again rebalance happens.

Looks like the consumers not are not notified of the rebalance and partition assignment to another consumer immediatley, because of which multiple rebalances happen.

I could be doing something wrong, I am not sure yet, looking for help.

@shweta-fourkites
Copy link
Author

shweta-fourkites commented Jan 23, 2022

@dnwe ,
I added additional logs in the ConsumerGroupHandler's Setup and Cleanup methods.

Below is what happened.

No. of partitions in my topic - 3

Events :

 20:24:09 - Consumer 1 added to listen to this topic. 
                      Setup Done - **Generation ID 436,
                      Partition Claims - 0,1,2** 
 20:24:54 - Channel was closed [No error from consumer reported though] and Cleanup was done.

 20:24:54 - Setup Done again for consumer 1 , **Generation ID 437**

 20:30:14 - Consumer 2 added to the group -
          Setup Done -  **Generation ID 438,
          Partition Claims - 0,1**
          Started reading and commiting msgs from **partition 0,1**

          Consumer 1 - continued to read messages from message channel from partition 0,1,2
                     - However, after processing session.Commit() failed with error
                       **kafka server: The provided member is not known in the current generation** (on all 3 partitions)
 20:30:48 - Consumer 1 Cleanup hook called
 20:30:48 - Consumer 1 Consume() called, returned read i/o timeout error
          [Note: I have been seeing this read i/o timeout error often when new session creation was attempted but resolves when re-attempted and hence I dont return/panic when err!= nil from consume, instead continue to loop]

          for {
            if err := c.consumer.Consume(ctx, c.kafkaConfig.Topics, c); err != nil {
		   		Logger.Error(nil, "[Troubleshooting] Error from consumer: %v", err, c.groupName)
	        }
	      }

20:30:48 - Consumer 1 Setup Done, **Generation ID - 440, Partition Claims - 2**

20:31:34 - Consumer 2 - Continued to read messages from partition 0,1 but commits failed with below 		   message:
			**kafka server: A rebalance for the group is in progress. Please re-join the group.**	      
20:31:45 - Consumer 2 - Continued to read messages from partition 0,1 but commits failed with below 		   message (error message changed)
			kafka server: **The provided member is not known in the current generation.**
20:31:54 - Consumer 3 joined the group
		   Setup Done - **Generation ID 440** 
		   **Partition claims - 0,1**
		   Started reading and commiting msgs from partition 0,1

20:32:08 - Consumer 2 Cleanup done
20:32:09 - Consumer 2 Setup done , **Generation ID - 441, claimed partition 0,1**
           Consumer 1 Continued to read partition 2 but commits failed 
           Consumer 3 no error, no commit failures, continued to read 0,1
           **[Note both consumer 2 and 3 reading partition 0,1 with consumer 3 in outdated generation 440]**
20:33:43 - Clean up done for consumer 1,
           Setup done again, **Generation ID - 441
           claimed partitions - 2**

 The above process continued and rebalancing was happening continoulsy as all 3 consumers were not notified of a generation update at the same time. Message channel was open for a consumer with outdated generation id.

My overridden configurations -

config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
config.Consumer.MaxProcessingTime = 500 * time.Millisecond
config.Consumer.Offsets.AutoCommit.Enable = false

@shweta-fourkites
Copy link
Author

shweta-fourkites commented Jan 23, 2022

@dnwe @varun06 @bai

@shweta-fourkites
Copy link
Author

After debugging further , it turns out that Message channel will always hold messages as long as there are messages available in the topic and other consumer instances available.
Hence this condition was not enough to break the loop within ConsumeClaim and the consumer was not able to re-join the group during rebalance.

This caused continous rebalances as the cosnumers were out of sync.

I used session.Context.Done() to see if the context was closed. This happens when there is a rebalance - the loop exits and the consumer re-joins the group.

Pasted the code sample for those struggling with similar issues -

func(handler *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for processMessage(claim, session) {
        
}

}

//Old code
func(handler *ConsumerGroupHandler) processMessage(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) bool {
msg, ok := <-claim.Messages():
if !ok {
return false
}
//process the message
//MarkMessage
//Commit

}

//Updated code
func(handler *ConsumerGroupHandler) processMessage(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) bool {

select {
 case msg, ok := <-claim.Messages():
   if !ok {
      return false
   }

  case <-session.Context().Done():
	    return false
   }


   //process the message
   //MarkMessage
   //Commit

}

@dnwe

duglin pushed a commit to cloudevents/sdk-go that referenced this issue Nov 10, 2022
* Patch sarama_kafka rebalance fix

Issue: #817
Issue Explanation: IBM/sarama#2118
Fix reference: https://github.com/Shopify/sarama/blob/5e2c2ef0e429f895c86152189f625bfdad7d3452/examples/consumergroup/main.go#L177

Signed-off-by: nbajaj90 <[email protected]>

* Adding comment as per code review comment

Signed-off-by: nbajaj90 <[email protected]>

* Fixing typo

Signed-off-by: nbajaj90 <[email protected]>

* Incorporated review comment to move comment

Signed-off-by: nbajaj90 <[email protected]>

Signed-off-by: nbajaj90 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants