Skip to content

Commit

Permalink
migrate wakeup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasbru committed Dec 5, 2023
1 parent 5574790 commit 59b8c55
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,82 +205,6 @@ private static Stream<Exception> commitExceptionSupplier() {
new GroupAuthorizationException("Group authorization exception"));
}

@Test
public void testWakeupBeforeCallingPoll() {
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
consumer.assign(singleton(tp));

consumer.wakeup();

assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testWakeupAfterEmptyFetch() {
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doAnswer(invocation -> {
consumer.wakeup();
return Fetch.empty();
}).when(fetchCollector).collectFetch(any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
consumer.assign(singleton(tp));

assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testWakeupAfterNonEmptyFetch() {
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
final List<ConsumerRecord<String, String>> records = asList(
new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
);
doAnswer(invocation -> {
consumer.wakeup();
return Fetch.forPartition(tp, records, true);
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
consumer.assign(singleton(tp));

// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
}

@Test
public void testClearWakeupTriggerAfterPoll() {
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
final List<ConsumerRecord<String, String>> records = asList(
new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
);
doReturn(Fetch.forPartition(tp, records, true))
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
consumer.assign(singleton(tp));

consumer.poll(Duration.ZERO);

assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testEnsureCallbackExecutedByApplicationThread() {
final String currentThread = Thread.currentThread().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
*/
package org.apache.kafka.clients.consumer.internals;

import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -40,6 +44,8 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
Expand All @@ -57,6 +63,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
Expand Down Expand Up @@ -410,6 +417,90 @@ public void testSubscriptionOnEmptyTopic() {
assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic)));
}

@Test
public void testWakeupBeforeCallingPoll() {
consumer = setup();
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
consumer.assign(singleton(tp));

consumer.wakeup();

assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testWakeupAfterEmptyFetch() {
consumer = setup();
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
doAnswer(invocation -> {
consumer.wakeup();
return Fetch.empty();
}).when(fetchCollector).collectFetch(any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
consumer.assign(singleton(tp));

assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testWakeupAfterNonEmptyFetch() {
consumer = setup();
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
final List<ConsumerRecord<String, String>> records = asList(
new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
);
doAnswer(invocation -> {
consumer.wakeup();
return Fetch.forPartition(tp, records, true);
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
consumer.assign(singleton(tp));

// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
}

@Test
public void testClearWakeupTriggerAfterPoll() {
consumer = setup();
final String topicName = "foo";
final int partition = 3;
final TopicPartition tp = new TopicPartition(topicName, partition);
final List<ConsumerRecord<String, String>> records = asList(
new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
);
doReturn(Fetch.forPartition(tp, records, true))
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
consumer.assign(singleton(tp));

consumer.poll(Duration.ZERO);

assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down

0 comments on commit 59b8c55

Please sign in to comment.