Skip to content

Commit

Permalink
GH-2128 Do Not Sleep Consumer Thread for Nack
Browse files Browse the repository at this point in the history
Resolves #2128

Suspending polling delays rebalancing; instead pause the consumer and
continue polling. Check if partitions are already paused and only pause
the current active partitions and resume them after the sleep interval
has passed.

Re-pause as necessary after a rebalance.

Also tested with reporter's reproducer.

**cherry-pick to 2.8.x**
  • Loading branch information
garyrussell authored and artembilan committed Mar 7, 2022
1 parent 220262d commit be4b1bf
Show file tree
Hide file tree
Showing 6 changed files with 363 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Header infoHeader = new RecordHeader(KafkaHeaders.LISTENER_INFO, this.listenerinfo);

private final Set<TopicPartition> pausedForNack = new HashSet<>();

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private int count;
Expand All @@ -728,6 +730,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private long nackSleep = -1;

private long nackWake;

private int nackIndex;

private Iterator<TopicPartition> batchIterator;
Expand Down Expand Up @@ -1597,6 +1601,10 @@ private void pauseConsumerIfNecessary() {
}

private void doPauseConsumerIfNecessary() {
if (this.pausedForNack.size() > 0) {
this.logger.debug("Still paused for nack sleep");
return;
}
if (this.offsetsInThisBatch != null && this.offsetsInThisBatch.size() > 0 && !this.pausedForAsyncAcks) {
this.pausedForAsyncAcks = true;
this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch);
Expand All @@ -1610,7 +1618,15 @@ private void doPauseConsumerIfNecessary() {
}

private void resumeConsumerIfNeccessary() {
if (this.offsetsInThisBatch != null) {
if (this.nackWake > 0) {
if (System.currentTimeMillis() > this.nackWake) {
this.nackWake = 0;
this.consumer.resume(this.pausedForNack);
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
this.pausedForNack.clear();
}
}
else if (this.offsetsInThisBatch != null) {
synchronized (this) {
doResumeConsumerIfNeccessary();
}
Expand Down Expand Up @@ -1654,12 +1670,10 @@ private void pausePartitionsIfNecessary() {
}

private void resumePartitionsIfNecessary() {
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
List<TopicPartition> partitionsToResume = this
.assignedPartitions
List<TopicPartition> partitionsToResume = getAssignedPartitions()
.stream()
.filter(tp -> !isPartitionPauseRequested(tp)
&& pausedConsumerPartitions.contains(tp))
&& this.pausedPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToResume.size() > 0) {
this.consumer.resume(partitionsToResume);
Expand Down Expand Up @@ -2206,7 +2220,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
processCommits();
}
SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
nackSleepAndReset();
pauseForNackSleep();
}
}

Expand Down Expand Up @@ -2467,17 +2481,29 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
}
}
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
nackSleepAndReset();
pauseForNackSleep();
}

private void nackSleepAndReset() {
try {
ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this.thisOrParentContainer, this.nackSleep);
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
private void pauseForNackSleep() {
if (this.nackSleep > 0) {
this.nackWake = System.currentTimeMillis() + this.nackSleep;
this.nackSleep = -1;
Set<TopicPartition> alreadyPaused = this.consumer.paused();
this.pausedForNack.addAll(getAssignedPartitions());
this.pausedForNack.removeAll(alreadyPaused);
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
try {
this.consumer.pause(this.pausedForNack);
}
catch (IllegalStateException ex) {
// this should never happen; defensive, just in case...
this.logger.warn(() -> "Could not pause for nack, possible rebalance in process: "
+ ex.getMessage());
Set<TopicPartition> nowPaused = new HashSet<>(this.consumer.paused());
nowPaused.removeAll(alreadyPaused);
this.consumer.resume(nowPaused);
}
}
this.nackSleep = -1;
}

/**
Expand Down Expand Up @@ -3251,6 +3277,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (ListenerConsumer.this.assignedPartitions != null) {
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
}
ListenerConsumer.this.pausedForNack.removeAll(partitions);
partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp));
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
Expand All @@ -3275,6 +3302,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again, so the initial poll() will never return any records");
}
if (ListenerConsumer.this.pausedForNack.size() > 0) {
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
}
ListenerConsumer.this.assignedPartitions.addAll(partitions);
if (ListenerConsumer.this.commitCurrentOnAssignment
&& !collectAndCommitIfNecessary(partitions)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -197,7 +197,12 @@ public Consumer consumer() {
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
new RecordHeaders(), Optional.empty())));
final AtomicInteger which = new AtomicInteger();
final AtomicBoolean paused = new AtomicBoolean();
willAnswer(i -> {
if (paused.get()) {
Thread.sleep(10);
return ConsumerRecords.empty();
}
this.pollLatch.countDown();
switch (which.getAndIncrement()) {
case 0:
Expand All @@ -211,9 +216,20 @@ public Consumer consumer() {
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords(Collections.emptyMap());
return ConsumerRecords.empty();
}
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
willAnswer(i -> {
return Collections.emptySet();
}).given(consumer).paused();
willAnswer(i -> {
paused.set(true);
return null;
}).given(consumer).pause(any());
willAnswer(i -> {
paused.set(false);
return null;
}).given(consumer).resume(any());
willAnswer(i -> {
this.commitLatch.countDown();
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -77,7 +78,7 @@
@SpringJUnitConfig
@DirtiesContext
@SuppressWarnings("deprecation")
public class ManualNackRecordTxTests {
public class ManualNackBatchTxTests {

@SuppressWarnings("rawtypes")
@Autowired
Expand All @@ -102,6 +103,7 @@ public class ManualNackRecordTxTests {
@Test
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.replayTime).isBetween(50L, 30_000L);
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
this.registry.stop();
Expand All @@ -128,24 +130,27 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
@EnableKafka
public static class Config {

private final List<List<String>> contents = new ArrayList<>();
final List<List<String>> contents = new ArrayList<>();

private final CountDownLatch pollLatch = new CountDownLatch(3);
final CountDownLatch pollLatch = new CountDownLatch(3);

private final CountDownLatch deliveryLatch = new CountDownLatch(2);
final CountDownLatch deliveryLatch = new CountDownLatch(2);

private final CountDownLatch closeLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);

private final CountDownLatch commitLatch = new CountDownLatch(2);
final CountDownLatch commitLatch = new CountDownLatch(2);

private int count;
volatile int count;

volatile long replayTime;

@KafkaListener(topics = "foo", groupId = "grp")
public void foo(List<String> in, Acknowledgment ack) {
this.contents.add(in);
this.replayTime = System.currentTimeMillis() - this.replayTime;
this.deliveryLatch.countDown();
if (++this.count == 1) { // part 1, offset 1, first time
ack.nack(3, 0);
ack.nack(3, 50);
}
else {
ack.acknowledge();
Expand Down Expand Up @@ -196,7 +201,12 @@ public Consumer consumer() {
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
new RecordHeaders(), Optional.empty())));
final AtomicInteger which = new AtomicInteger();
final AtomicBoolean paused = new AtomicBoolean();
willAnswer(i -> {
if (paused.get()) {
Thread.sleep(10);
return ConsumerRecords.empty();
}
this.pollLatch.countDown();
switch (which.getAndIncrement()) {
case 0:
Expand All @@ -210,9 +220,20 @@ public Consumer consumer() {
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords(Collections.emptyMap());
return ConsumerRecords.empty();
}
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
willAnswer(i -> {
return Collections.emptySet();
}).given(consumer).paused();
willAnswer(i -> {
paused.set(true);
return null;
}).given(consumer).pause(any());
willAnswer(i -> {
paused.set(false);
return null;
}).given(consumer).resume(any());
willAnswer(i -> {
this.commitLatch.countDown();
return null;
Expand Down
Loading

0 comments on commit be4b1bf

Please sign in to comment.