Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling .nack() suspends consumer group rebalance #2128

Open
tkornai opened this issue Feb 23, 2022 · 11 comments · Fixed by #2131
Open

Calling .nack() suspends consumer group rebalance #2128

tkornai opened this issue Feb 23, 2022 · 11 comments · Fixed by #2131

Comments

@tkornai
Copy link

tkornai commented Feb 23, 2022

In what version(s) of Spring for Apache Kafka are you seeing this issue?

I tested the following versions, they all show the same behaviour:

2.3.13, 2.5.14, 2.6.3, 2.8.2

Describe the bug

After calling KafkaMessageListenerContainer.nack() if a consumer group rebalance is triggered then it won't finish until the nack() timeout has passed.

The extended CG rebalance prevents every member of the consumer group from progressing with message processing.

To Reproduce

Please follow the steps described in the sample repository: https://github.com/tkornai/nack-rebalance-demo

Expected behavior

Calling nack() must not delay subsequent consumer group rebalances.

Sample

A link to a GitHub repository with a minimal, reproducible, sample.

@garyrussell
Copy link
Contributor

Many thanks for the repro; big help!

Interesting problem; it's going to be tricky to solve it because we don't find out about the rebalance until we next poll the consumer.

I think we'll need to change the architecture to pause the consumer for the nack sleep time and continue to poll it; however, then, the accuracy of the sleep time will be affected by the poll timeout.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Feb 23, 2022
Resolves spring-projects#2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

**cherry-pick to 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Feb 23, 2022
Resolves spring-projects#2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

**cherry-pick to 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Feb 23, 2022
Resolves spring-projects#2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

**cherry-pick to 2.8.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Mar 7, 2022
Resolves spring-projects#2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

**cherry-pick to 2.8.x**
artembilan pushed a commit that referenced this issue Mar 7, 2022
Resolves #2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

**cherry-pick to 2.8.x**
artembilan pushed a commit that referenced this issue Mar 7, 2022
Resolves #2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

**cherry-pick to 2.8.x**
@jffourmond
Copy link

jffourmond commented May 4, 2022

Hi, following this fix, nack ignores the sleep time parameter. Messages are re-consumed 5 seconds later, no matter what the sleep time value is. Is there something I can do about it ?

@garyrussell
Copy link
Contributor

The default poll timeout is 5 seconds, so that's a smoking gun.

I'll take a look, but the logic looks correct to me

When receive the nack, we set nackWake.

this.nackWake = System.currentTimeMillis() + this.nackSleep;

and resume only when

if (System.currentTimeMillis() > this.nackWake) {

@garyrussell
Copy link
Contributor

Works as expected for me:

@SpringBootApplication
public class Kgh2128Application {

	private static final Logger log = LoggerFactory.getLogger(Kgh2128Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Kgh2128Application.class, args);
	}
	@KafkaListener(id = "kgh2128", topics = "kgh2128")
	void listen(String in, Acknowledgment ack) {
		log.info(in);
		ack.nack(15_000);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh2128").partitions(1).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> {
			template.send("kgh2128", "foo");
			template.send("kgh2128", "bar");
		};
	}

}
2022-05-04 09:41:41.808  INFO 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : kgh2128: partitions assigned: [kgh2128-0]
2022-05-04 09:41:41.816 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 4 records
2022-05-04 09:41:41.818 DEBUG 19459 --- [  kgh2128-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ac3a19e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=kgh2128, kafka_receivedTimestamp=1651671626961, kafka_acknowledgment=Acknowledgment for kgh2128-0@0, kafka_groupId=kgh2128}]]
2022-05-04 09:41:41.819  INFO 19459 --- [  kgh2128-0-C-1] com.example.demo.Kgh2128Application      : foo
2022-05-04 09:41:41.819 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-04 09:41:41.821  INFO 19459 --- [  kgh2128-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh2128-1, groupId=kgh2128] Seeking to offset 0 for partition kgh2128-0
2022-05-04 09:41:41.821 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Pausing for nack sleep: [kgh2128-0]
2022-05-04 09:41:41.821 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-04 09:41:41.821 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2022-05-04 09:41:46.822 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-04 09:41:46.823 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-04 09:41:46.823 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2022-05-04 09:41:51.824 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-04 09:41:51.824 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-04 09:41:51.824 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2022-05-04 09:41:56.826 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2022-05-04 09:41:56.827 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Resumed after nack sleep: [kgh2128-0]
2022-05-04 09:41:56.827 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2022-05-04 09:41:56.830 DEBUG 19459 --- [  kgh2128-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 4 records
2022-05-04 09:41:56.830 DEBUG 19459 --- [  kgh2128-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ac3a19e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=kgh2128, kafka_receivedTimestamp=1651671626961, kafka_acknowledgment=Acknowledgment for kgh2128-0@0, kafka_groupId=kgh2128}]]
2022-05-04 09:41:56.830  INFO 19459 --- [  kgh2128-0-C-1] com.example.demo.Kgh2128Application      : foo

@jffourmond Please provide a similar example that exhibits the behavior you are seeing.

@jffourmond
Copy link

jffourmond commented May 4, 2022

Thank you @garyrussell for your answer.

I think the difference of behavior we observe lies in the sleep time value we use. If I call ack.nack(15_000) like you do, my message is re-consumed after 15s as expected. But my sleep time value is 1000, because I wanted to retry every second. It used to work as I wanted in spring-kafka 2.8.3.

Is it a bad practice to set a sleep time < DEFAULT_POLL_TIMEOUT ?

@garyrussell
Copy link
Contributor

I wouldn't call it an anti-pattern, but with this change, the actual sleep time will be max(sleep, pollTimeout), so you should reduce the poll timeout accordingly.

I will update the documentation.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue May 4, 2022
artembilan pushed a commit that referenced this issue May 4, 2022
See #2128

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue May 4, 2022
See #2128

**cherry-pick to 2.9.x, 2.8.x**
artembilan pushed a commit that referenced this issue May 4, 2022
See #2128

**cherry-pick to 2.9.x, 2.8.x**
@jffourmond
Copy link

Thanks @garyrussell. I'm not sure the max(sleep, pollTimeout) formula is accurate. If pollTimeout is 5000 and sleep is 6000, the actual sleep time is 10000 in my app.

@garyrussell
Copy link
Contributor

Yes, I realized that when I updated the documentation: https://github.com/spring-projects/spring-kafka/pull/2254/files

IMPORTANT: The consumer is paused during the sleep so that we continue to poll the broker to keep the consumer alive.
The actual sleep time, and its resolution depends on the container's maxPollInterval which defaults to 5 seconds.
The minimum sleep time is equal to the maxPollInterval and all sleep times will be a multiple of it.
For small sleep times, consider reducing the container's maxPollInterval.

@garyrussell
Copy link
Contributor

However, that says maxPollInterval, I meant pollTimeout; will fix and further clarify.

garyrussell added a commit that referenced this issue May 6, 2022
garyrussell added a commit that referenced this issue May 6, 2022
garyrussell added a commit that referenced this issue May 6, 2022
@tasosz
Copy link

tasosz commented Jun 27, 2024

Hi @garyrussell

However, that says maxPollInterval, I meant pollTimeout; will fix and further clarify.

The javadocs on org.springframework.kafka.support.Acknowledgment.nack(Duration sleep) and org.springframework.kafka.support.Acknowledgment.nack(int index, Duration sleep) also need updating:

  • @param sleep the duration to sleep; the actual sleep time will be larger of this value
    * and the container's {@code maxPollInterval}, which defaults to 5 seconds.

@sobychacko
Copy link
Contributor

@tasosz You are welcome to send a PR addressing this. Or we can look at this later. Reopening the issue. Thanks!

@sobychacko sobychacko reopened this Jun 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants