From e5aed9dd5efb1e8ee1f1d84a56a7de7da548d472 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Mon, 12 Jun 2023 15:02:48 -0400 Subject: [PATCH] KAFKA-14965 #2 Improvements for ListOffsetEvent based on OffsetAndTimestamp and ListOffsetResult & Tests --- .../internals/ListOffsetsRequestManager.java | 57 +-- .../internals/PrototypeAsyncConsumer.java | 11 +- .../events/ApplicationEventProcessor.java | 2 +- .../events/ListOffsetsApplicationEvent.java | 29 +- .../ListOffsetsRequestManagerTest.java | 405 +++++++++++++----- 5 files changed, 358 insertions(+), 146 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManager.java index 39a52df9105d0..f30427693d75a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManager.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult; import org.apache.kafka.common.ClusterResource; @@ -44,15 +45,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; /** - * Build requests for retrieving partition offsets from partition leaders (see - * {@link #fetchOffsets(Set, long, boolean)}). Requests are kept in-memory - * ready to be sent on the next call to {@link #poll(long)}. + * Manager responsible for building requests to retrieve partition offsets (see + * {@link #fetchOffsets(Map, boolean)}). Requests are kept in-memory ready to be sent on the next + * call to {@link #poll(long)}. *

* Partition leadership information required to build the requests is retrieved from the * {@link ConsumerMetadata}, so this implements {@link ClusterResourceListener} to get notified @@ -109,24 +109,24 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return pollResult; } + /** * Retrieve offsets for the given partitions and timestamp. * - * @param partitions Partitions to get offsets for - * @param timestamp Target time to look offsets for - * @param requireTimestamps True if this should fail with an UnsupportedVersionException if - * the broker does not support fetching precise timestamps for offsets - * @return Future containing the map of offsets retrieved for each partition. The future will - * complete when the responses for the requests are received and processed following a call - * to {@link #poll(long)} + * @param timestampsToSearch Partitions and target timestamps to get offsets for + * @return Future containing the map of {@link TopicPartition} and {@link OffsetAndTimestamp} + * found (offset of the first message whose timestamp is greater than or equals to the target + * timestamp).The future will complete when the requests responses are received and + * processed, following a call to {@link #poll(long)} */ - public CompletableFuture> fetchOffsets(final Set partitions, - final long timestamp, - final boolean requireTimestamps) { - metadata.addTransientTopics(offsetFetcherUtils.topicsForPartitions(partitions)); + public CompletableFuture> fetchOffsets( + final Map timestampsToSearch, + final boolean requireTimestamps) { + if (timestampsToSearch.isEmpty()) { + return CompletableFuture.completedFuture(Collections.emptyMap()); + } + metadata.addTransientTopics(offsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet())); - Map timestampsToSearch = partitions.stream() - .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState( timestampsToSearch, requireTimestamps, @@ -134,13 +134,17 @@ public CompletableFuture> fetchOffsets(final Set { metadata.clearTransientTopics(); - log.debug("Fetch offsets completed for partitions {} and timestamp {}. Result {}, " + - "error", partitions, timestamp, result, error); + log.debug("Fetch offsets completed for partitions and timestamps {}. Result {}, " + + "error", timestampsToSearch, result, error); + }).exceptionally(error -> { + log.error(error.getMessage()); + return null; }); fetchOffsetsByTimes(timestampsToSearch, requireTimestamps, listOffsetsRequestState); - return listOffsetsRequestState.globalResult; + return listOffsetsRequestState.globalResult.thenApply(result -> + offsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); } /** @@ -150,9 +154,14 @@ public CompletableFuture> fetchOffsets(final Set timestampsToSearch, final boolean requireTimestamps, final ListOffsetsRequestState listOffsetsRequestState) { + if (timestampsToSearch.isEmpty()) { + // Early return if empty map to avoid wrongfully raising StaleMetadataException on + // empty grouping + return; + } try { - List unsentRequests = - sendListOffsetsRequests(timestampsToSearch, requireTimestamps, listOffsetsRequestState); + List unsentRequests = sendListOffsetsRequests( + timestampsToSearch, requireTimestamps, listOffsetsRequestState); requestsToSend.addAll(unsentRequests); } catch (StaleMetadataException e) { requestsToRetry.add(listOffsetsRequestState); @@ -208,7 +217,7 @@ private List sendListOffsetsRequests( ListOffsetResult listOffsetResult = new ListOffsetResult(listOffsetsRequestState.fetchedOffsets, listOffsetsRequestState.remainingToSearch.keySet()); - listOffsetsRequestState.globalResult.complete(listOffsetResult.offsetAndMetadataMap()); + listOffsetsRequestState.globalResult.complete(listOffsetResult); } else { requestsToRetry.add(listOffsetsRequestState); } @@ -270,7 +279,7 @@ private static class ListOffsetsRequestState { private final Map timestampsToSearch; private final Map fetchedOffsets; private final Map remainingToSearch; - private final CompletableFuture> globalResult; + private final CompletableFuture globalResult; final boolean requireTimestamps; final OffsetFetcherUtils offsetFetcherUtils; final IsolationLevel isolationLevel; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java index becb6cf2e02ba..523be1e0e1557 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java @@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -406,11 +407,15 @@ private Map beginningOrEndOffset(Collection timestampToSearch = + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> timestamp)); final ListOffsetsApplicationEvent listOffsetsEvent = new ListOffsetsApplicationEvent( - partitions.stream().collect(Collectors.toSet()), - timestamp, + timestampToSearch, false); - return eventHandler.addAndGet(listOffsetsEvent, timeout); + Map offsetAndTimestampMap = + eventHandler.addAndGet(listOffsetsEvent, timeout); + return offsetAndTimestampMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), + e -> e.getValue().offset())); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 23619beaebc28..e439365b62f2d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -127,7 +127,7 @@ private boolean process(final UnsubscribeApplicationEvent event) { } private boolean process(final ListOffsetsApplicationEvent event) { - requestManagers.listOffsetsRequestManager.fetchOffsets(event.partitions, event.timestamp, + requestManagers.listOffsetsRequestManager.fetchOffsets(event.timestampsToSearch, event.requireTimestamps) .whenComplete((result, error) -> { if (error != null) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java index 3a5a49cbc8c37..4afd0ea70b0ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java @@ -16,42 +16,41 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; /** - * Event for retrieving partition offsets from the partition leader - * by performing a {@link org.apache.kafka.common.requests.ListOffsetsRequest} + * Event for retrieving partition offsets by performing a + * {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}. + * This event is created with a map of {@link TopicPartition} and target timestamps to search + * offsets for. It is completed with a map of {@link TopicPartition} and the + * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than + * or equals to the target timestamp) */ -public class ListOffsetsApplicationEvent extends CompletableApplicationEvent> { - private final CompletableFuture> future; +public class ListOffsetsApplicationEvent extends CompletableApplicationEvent> { + private final CompletableFuture> future; - final Set partitions; - final long timestamp; + final Map timestampsToSearch; final boolean requireTimestamps; - public ListOffsetsApplicationEvent(final Set partitions, - long timestamp, - boolean requireTimestamps) { + public ListOffsetsApplicationEvent(Map timestampToSearch, boolean requireTimestamps) { super(Type.LIST_OFFSETS); - this.partitions = partitions; - this.timestamp = timestamp; + this.timestampsToSearch = timestampToSearch; this.requireTimestamps = requireTimestamps; this.future = new CompletableFuture<>(); } - public CompletableFuture> future() { + public CompletableFuture> future() { return future; } @Override public String toString() { return "ListOffsetsApplicationEvent {" + - "partitions=" + partitions + ", " + - "target timestamp=" + timestamp + ", " + + "timestampsToSearch=" + timestampsToSearch + ", " + "requireTimestamps=" + requireTimestamps + '}'; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManagerTest.java index 0a26b748fcafe..c2a15ba03c8fd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ListOffsetsRequestManagerTest.java @@ -19,12 +19,14 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -35,6 +37,7 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -51,6 +54,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -89,33 +94,120 @@ public void setup() { @Test public void testListOffsetsRequest_Success() throws ExecutionException, InterruptedException { - Map partitionsOffsets = new HashMap<>(); - long offset = ListOffsetsRequest.EARLIEST_TIMESTAMP; - partitionsOffsets.put(TEST_PARTITION_1, offset); + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture> result = requestManager.fetchOffsets( - partitionsOffsets.keySet(), - ListOffsetsRequest.EARLIEST_TIMESTAMP, + CompletableFuture> result = requestManager.fetchOffsets( + timestampsToSearch, false); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); - verifySuccessfulPollAndResponseReceived(result, partitionsOffsets); + Map expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); + verifySuccessfulPollAndResponseReceived(result, expectedOffsets); } @Test - public void testBuildingRequestFails_RetrySucceeds() throws ExecutionException, + public void testListOffsetsWaitingForMetadataUpdate_Timeout() { + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); + + // Building list offsets request fails with unknown leader + expectFailedRequest_MissingLeader(); + CompletableFuture> fetchOffsetsFuture = + requestManager.fetchOffsets(timestampsToSearch, false); + assertEquals(0, requestManager.requestsToSend()); + assertEquals(1, requestManager.requestsToRetry()); + NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); + assertEquals(0, res.unsentRequests.size()); + // Metadata update not happening within the time boundaries of the request future, so + // future should time out. + assertThrows(TimeoutException.class, () -> fetchOffsetsFuture.get(100L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testListOffsetsRequestMultiplePartitions() throws ExecutionException, + InterruptedException { + Map timestampsToSearch = new HashMap<>(); + timestampsToSearch.put(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP); + timestampsToSearch.put(TEST_PARTITION_2, ListOffsetsRequest.EARLIEST_TIMESTAMP); + + + Map partitionLeaders = new HashMap<>(); + partitionLeaders.put(TEST_PARTITION_1, LEADER_1); + partitionLeaders.put(TEST_PARTITION_2, LEADER_1); + expectSuccessfulRequest(partitionLeaders); + CompletableFuture> result = requestManager.fetchOffsets( + timestampsToSearch, + false); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + Map expectedOffsets = timestampsToSearch.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> new OffsetAndTimestamp(5L, 1L))); + verifySuccessfulPollAndResponseReceived(result, expectedOffsets); + } + + @Test + public void testListOffsetsRequestEmpty() throws ExecutionException, InterruptedException { + CompletableFuture> result = requestManager.fetchOffsets( + Collections.emptyMap(), + false); + assertEquals(0, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); + assertTrue(pollResult.unsentRequests.isEmpty()); + + assertEquals(0, requestManager.requestsToRetry()); + assertEquals(0, requestManager.requestsToSend()); + + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + assertTrue(result.get().isEmpty()); + } + + @Test + public void testListOffsetsRequestUnknownOffset() throws ExecutionException, + InterruptedException { + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); + + expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); + CompletableFuture> result = requestManager.fetchOffsets( + timestampsToSearch, + false); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + // verifySuccessfulPollAndResponseReceived(result, expectedOffsets); + + List topicResponses = Collections.singletonList( + buildTopicResponse(TEST_PARTITION_1, Errors.NONE, + ListOffsetsResponse.UNKNOWN_TIMESTAMP, ListOffsetsResponse.UNKNOWN_OFFSET)); + + NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds()); + verifySuccessfulPoll(retriedPoll); + NetworkClientDelegate.UnsentRequest unsentRequest = retriedPoll.unsentRequests.get(0); + ClientResponse clientResponse = buildClientResponse(unsentRequest, topicResponses); + clientResponse.onComplete(); + Map expectedOffsets = + Collections.singletonMap(TEST_PARTITION_1, null); + verifyRequestSuccessfullyCompleted(result, expectedOffsets); + + } + + @Test + public void testListOffsetsWaitingForMetadataUpdate_RetrySucceeds() throws ExecutionException, InterruptedException { - Map partitionsOffsets = new HashMap<>(); - long offset = 1L; - partitionsOffsets.put(TEST_PARTITION_1, offset); + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); // Building list offsets request fails with unknown leader expectFailedRequest_MissingLeader(); - CompletableFuture> fetchOffsetsFuture = - requestManager.fetchOffsets(partitionsOffsets.keySet(), - ListOffsetsRequest.EARLIEST_TIMESTAMP, + CompletableFuture> fetchOffsetsFuture = + requestManager.fetchOffsets(timestampsToSearch, false); assertEquals(0, requestManager.requestsToSend()); assertEquals(1, requestManager.requestsToRetry()); @@ -129,21 +221,21 @@ public void testBuildingRequestFails_RetrySucceeds() throws ExecutionException, requestManager.onUpdate(new ClusterResource("")); assertEquals(1, requestManager.requestsToSend()); - verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, partitionsOffsets); + Map expectedOffsets = Collections.singletonMap( + TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); + verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, expectedOffsets); } @ParameterizedTest @MethodSource("retriableErrors") public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) throws ExecutionException, InterruptedException { - Map partitionsOffsets = new HashMap<>(); - long offset = 1L; - partitionsOffsets.put(TEST_PARTITION_1, offset); + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); // List offsets request that is successfully built expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture> fetchOffsetsFuture = requestManager.fetchOffsets( - partitionsOffsets.keySet(), - ListOffsetsRequest.EARLIEST_TIMESTAMP, + CompletableFuture> fetchOffsetsFuture = requestManager.fetchOffsets( + timestampsToSearch, false); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); @@ -155,10 +247,10 @@ public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) throw // Failed response received NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); - unsentRequest.future().complete( - buildClientResponseWithErrors( - unsentRequest, - Collections.singletonMap(TEST_PARTITION_1, error))); + ClientResponse clientResponse = buildClientResponseWithErrors( + unsentRequest, + Collections.singletonMap(TEST_PARTITION_1, error)); + clientResponse.onComplete(); assertFalse(fetchOffsetsFuture.isDone()); assertEquals(1, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); @@ -168,35 +260,26 @@ public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) throw requestManager.onUpdate(new ClusterResource("")); assertEquals(1, requestManager.requestsToSend()); - verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, partitionsOffsets); - } - - private static Stream retriableErrors() { - return Stream.of( - Arguments.of(Errors.NOT_LEADER_OR_FOLLOWER), - Arguments.of(Errors.REPLICA_NOT_AVAILABLE), - Arguments.of(Errors.KAFKA_STORAGE_ERROR), - Arguments.of(Errors.OFFSET_NOT_AVAILABLE), - Arguments.of(Errors.LEADER_NOT_AVAILABLE), - Arguments.of(Errors.FENCED_LEADER_EPOCH), - Arguments.of(Errors.UNKNOWN_LEADER_EPOCH)); + Map expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); + verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, expectedOffsets); } @Test public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws ExecutionException, InterruptedException { - Map partitionsOffsets = new HashMap<>(); - long offset = ListOffsetsRequest.EARLIEST_TIMESTAMP; - partitionsOffsets.put(TEST_PARTITION_1, offset); - partitionsOffsets.put(TEST_PARTITION_2, offset); + Map timestampsToSearch = new HashMap<>(); + timestampsToSearch.put(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP); + timestampsToSearch.put(TEST_PARTITION_2, ListOffsetsRequest.EARLIEST_TIMESTAMP); + + Map expectedOffsets = timestampsToSearch.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> new OffsetAndTimestamp(5L, 1L))); // List offsets request to 2 brokers successfully built Map partitionLeaders = new HashMap<>(); partitionLeaders.put(TEST_PARTITION_1, LEADER_1); partitionLeaders.put(TEST_PARTITION_2, LEADER_2); expectSuccessfulRequest(partitionLeaders); - CompletableFuture> fetchOffsetsFuture = requestManager.fetchOffsets( - partitionsOffsets.keySet(), - ListOffsetsRequest.EARLIEST_TIMESTAMP, + CompletableFuture> fetchOffsetsFuture = requestManager.fetchOffsets( + timestampsToSearch, false); assertEquals(2, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); @@ -209,15 +292,15 @@ public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws E // Mixed response with failures and successes. Offsets successfully fetched from one // broker but retriable UNKNOWN_LEADER_EPOCH received from second broker. NetworkClientDelegate.UnsentRequest unsentRequest1 = res.unsentRequests.get(0); - unsentRequest1.future().complete( - buildClientResponseWithOffsets( - unsentRequest1, - Collections.singletonMap(TEST_PARTITION_1, offset))); + ClientResponse clientResponse1 = buildClientResponse( + unsentRequest1, + Collections.singletonMap(TEST_PARTITION_1, expectedOffsets.get(TEST_PARTITION_1))); + clientResponse1.onComplete(); NetworkClientDelegate.UnsentRequest unsentRequest2 = res.unsentRequests.get(1); - unsentRequest2.future().complete( - buildClientResponseWithErrors( - unsentRequest2, - Collections.singletonMap(TEST_PARTITION_2, Errors.UNKNOWN_LEADER_EPOCH))); + ClientResponse clientResponse2 = buildClientResponseWithErrors( + unsentRequest2, + Collections.singletonMap(TEST_PARTITION_2, Errors.UNKNOWN_LEADER_EPOCH)); + clientResponse2.onComplete(); assertFalse(fetchOffsetsFuture.isDone()); assertEquals(1, requestManager.requestsToRetry()); @@ -232,25 +315,26 @@ public void testRequestPartiallyFailsWithRetriableError_RetrySucceeds() throws E NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds()); verifySuccessfulPoll(retriedPoll); NetworkClientDelegate.UnsentRequest unsentRequest = retriedPoll.unsentRequests.get(0); - unsentRequest.future().complete(buildClientResponseWithOffsets(unsentRequest, Collections.singletonMap(TEST_PARTITION_2, offset))); + ClientResponse clientResponse = buildClientResponse(unsentRequest, + Collections.singletonMap(TEST_PARTITION_2, expectedOffsets.get(TEST_PARTITION_2))); + clientResponse.onComplete(); // Verify global result with the offset initially retrieved, and the offset that // initially failed but succeeded after a metadata update - verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, partitionsOffsets); + verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets); } @Test public void testRequestFailedResponse_NonRetriableAuthError() { - Map partitionsOffsets = new HashMap<>(); - long offset = 1L; - partitionsOffsets.put(TEST_PARTITION_1, offset); + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); // List offsets request that is successfully built expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture> fetchOffsetsFuture = requestManager.fetchOffsets( - partitionsOffsets.keySet(), - ListOffsetsRequest.EARLIEST_TIMESTAMP, - false); + CompletableFuture> fetchOffsetsFuture = + requestManager.fetchOffsets( + timestampsToSearch, + false); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); @@ -260,10 +344,10 @@ public void testRequestFailedResponse_NonRetriableAuthError() { // Failed response received NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); - unsentRequest.future().complete( - buildClientResponseWithErrors( - unsentRequest, - Collections.singletonMap(TEST_PARTITION_1, Errors.TOPIC_AUTHORIZATION_FAILED))); + + ClientResponse clientResponse = buildClientResponseWithErrors( + unsentRequest, Collections.singletonMap(TEST_PARTITION_2, Errors.TOPIC_AUTHORIZATION_FAILED)); + clientResponse.onComplete(); // Request completed with error. Nothing pending to be sent or retried verifyRequestCompletedWithErrorResponse(fetchOffsetsFuture, TopicAuthorizationException.class); @@ -271,13 +355,105 @@ public void testRequestFailedResponse_NonRetriableAuthError() { assertEquals(0, requestManager.requestsToSend()); } - private void verifySuccessfulPollAndResponseReceived(CompletableFuture> actualResult, - Map expectedResult) throws ExecutionException, InterruptedException { + @Test + public void testRequestFailedResponse_NonRetriableErrorTimesout() { + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); + + // List offsets request that is successfully built + expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); + CompletableFuture> fetchOffsetsFuture = + requestManager.fetchOffsets( + timestampsToSearch, + false); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + // Request successfully sent + NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); + verifySuccessfulPoll(res); + + // Failed response received + NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); + ClientResponse clientResponse = buildClientResponseWithErrors( + unsentRequest, Collections.singletonMap(TEST_PARTITION_2, Errors.BROKER_NOT_AVAILABLE)); + clientResponse.onComplete(); + + assertFalse(fetchOffsetsFuture.isDone()); + assertThrows(TimeoutException.class, () -> fetchOffsetsFuture.get(100L, TimeUnit.MILLISECONDS)); + + // Request completed with error. Nothing pending to be sent or retried + assertEquals(0, requestManager.requestsToRetry()); + assertEquals(0, requestManager.requestsToSend()); + } + + @Test + public void testRequestFails_AuthenticationException() { + Map timestampsToSearch = Collections.singletonMap(TEST_PARTITION_1, + ListOffsetsRequest.EARLIEST_TIMESTAMP); + + // List offsets request that is successfully built + expectSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); + CompletableFuture> fetchOffsetsFuture = + requestManager.fetchOffsets( + timestampsToSearch, + false); + assertEquals(1, requestManager.requestsToSend()); + assertEquals(0, requestManager.requestsToRetry()); + + // Request successfully sent + NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); + verifySuccessfulPoll(res); + + // Failed response received + NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); + ClientResponse clientResponse = + buildClientResponseWithAuthenticationException(unsentRequest); + clientResponse.onComplete(); + + // Request completed with error. Nothing pending to be sent or retried + verifyRequestCompletedWithErrorResponse(fetchOffsetsFuture, AuthenticationException.class); + assertEquals(0, requestManager.requestsToRetry()); + assertEquals(0, requestManager.requestsToSend()); + } + + private ListOffsetsResponseData.ListOffsetsTopicResponse buildTopicResponse( + TopicPartition tp, + Errors error, + long timestamp, + long offset) { + return new ListOffsetsResponseData.ListOffsetsTopicResponse() + .setName(tp.topic()) + .setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse() + .setPartitionIndex(tp.partition()) + .setErrorCode(error.code()) + .setTimestamp(timestamp) + .setOffset(offset))); + } + + private static Stream retriableErrors() { + return Stream.of( + Arguments.of(Errors.NOT_LEADER_OR_FOLLOWER), + Arguments.of(Errors.REPLICA_NOT_AVAILABLE), + Arguments.of(Errors.KAFKA_STORAGE_ERROR), + Arguments.of(Errors.OFFSET_NOT_AVAILABLE), + Arguments.of(Errors.LEADER_NOT_AVAILABLE), + Arguments.of(Errors.FENCED_LEADER_EPOCH), + Arguments.of(Errors.BROKER_NOT_AVAILABLE), + Arguments.of(Errors.UNKNOWN_LEADER_EPOCH)); + } + + private void verifySuccessfulPollAndResponseReceived( + CompletableFuture> actualResult, + Map expectedResult) throws ExecutionException, + InterruptedException { // Following poll should send the request and get a response NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds()); verifySuccessfulPoll(retriedPoll); NetworkClientDelegate.UnsentRequest unsentRequest = retriedPoll.unsentRequests.get(0); - unsentRequest.future().complete(buildClientResponseWithOffsets(unsentRequest, expectedResult)); + ClientResponse clientResponse = buildClientResponse(unsentRequest, + expectedResult); + clientResponse.onComplete(); verifyRequestSuccessfullyCompleted(actualResult, expectedResult); } @@ -306,20 +482,24 @@ private void verifySuccessfulPoll(NetworkClientDelegate.PollResult pollResult, assertEquals(requestCount, pollResult.unsentRequests.size()); } - private void verifyRequestSuccessfullyCompleted(CompletableFuture> actualResult, - Map expectedResult) - throws ExecutionException, InterruptedException { + private void verifyRequestSuccessfullyCompleted( + CompletableFuture> actualResult, + Map expectedResult) throws ExecutionException, InterruptedException { assertEquals(0, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); assertTrue(actualResult.isDone()); assertFalse(actualResult.isCompletedExceptionally()); - Map partitionOffsets = actualResult.get(); + Map partitionOffsets = actualResult.get(); assertEquals(expectedResult, partitionOffsets); - verifySubscriptionStateUpdated(expectedResult); + // Validate that the subscription state has been updated for all non-null offsets retrieved + Map validExpectedOffsets = expectedResult.entrySet().stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + verifySubscriptionStateUpdated(validExpectedOffsets); } - private void verifySubscriptionStateUpdated(Map expectedResult) { + private void verifySubscriptionStateUpdated(Map expectedResult) { ArgumentCaptor tpCaptor = ArgumentCaptor.forClass(TopicPartition.class); ArgumentCaptor offsetCaptor = ArgumentCaptor.forClass(Long.class); @@ -328,11 +508,14 @@ private void verifySubscriptionStateUpdated(Map expectedRe List updatedTp = tpCaptor.getAllValues(); List updatedOffsets = offsetCaptor.getAllValues(); + assertEquals(expectedResult.keySet().size(), updatedOffsets.size()); assertEquals(expectedResult.keySet(), new HashSet<>(updatedTp)); - assertEquals(new ArrayList<>(expectedResult.values()), updatedOffsets); + + assertEquals(expectedResult.values().size(), updatedOffsets.size()); + expectedResult.values().stream().map(offsetAndTimestamp -> updatedOffsets.contains(offsetAndTimestamp.offset())).forEach(Assertions::assertTrue); } - private void verifyRequestCompletedWithErrorResponse(CompletableFuture> actualResult, + private void verifyRequestCompletedWithErrorResponse(CompletableFuture> actualResult, Class expectedFailure) { assertTrue(actualResult.isDone()); assertTrue(actualResult.isCompletedExceptionally()); @@ -357,56 +540,72 @@ private Cluster testClusterMetadata(Map partitionLeaders) Collections.emptySet()); } - private ClientResponse buildClientResponseWithOffsets( + private ClientResponse buildClientResponse( + final NetworkClientDelegate.UnsentRequest request, + final Map partitionsOffsets) { + List topicResponses = new + ArrayList<>(); + partitionsOffsets.forEach((tp, offsetAndTimestamp) -> topicResponses.add(ListOffsetsResponse.singletonListOffsetsTopicResponse( + tp, + Errors.NONE, + offsetAndTimestamp.timestamp(), + offsetAndTimestamp.offset(), + offsetAndTimestamp.leaderEpoch().orElse(ListOffsetsResponse.UNKNOWN_EPOCH)))); + + return buildClientResponse(request, topicResponses, false, null); + } + + private ClientResponse buildClientResponse( final NetworkClientDelegate.UnsentRequest request, - final Map partitionsOffsets) { - return buildClientResponse(request, partitionsOffsets, Collections.emptyMap()); + final List topicResponses) { + + return buildClientResponse(request, topicResponses, false, null); } private ClientResponse buildClientResponseWithErrors( final NetworkClientDelegate.UnsentRequest request, - final Map partitionsErrors) { - return buildClientResponse(request, Collections.emptyMap(), partitionsErrors); + final Map partitionErrors) { + List topicResponses = new ArrayList<>(); + partitionErrors.forEach((tp, error) -> topicResponses.add(ListOffsetsResponse.singletonListOffsetsTopicResponse( + tp, + error, + ListOffsetsResponse.UNKNOWN_TIMESTAMP, + ListOffsetsResponse.UNKNOWN_OFFSET, + ListOffsetsResponse.UNKNOWN_EPOCH))); + + return buildClientResponse(request, topicResponses, false, null); + } + + private ClientResponse buildClientResponseWithAuthenticationException( + final NetworkClientDelegate.UnsentRequest request) { + return buildClientResponse(request, Collections.emptyList(), true, + new AuthenticationException("Authentication failed")); } private ClientResponse buildClientResponse( final NetworkClientDelegate.UnsentRequest request, - final Map partitionsOffsets, - final Map partitionsErrors) { + final List topicResponses, + final boolean disconnected, + final AuthenticationException authenticationException) { AbstractRequest abstractRequest = request.requestBuilder().build(); assertTrue(abstractRequest instanceof ListOffsetsRequest); ListOffsetsRequest offsetFetchRequest = (ListOffsetsRequest) abstractRequest; - ListOffsetsResponse response = buildListOffsetsResponse(partitionsOffsets, - partitionsErrors); + ListOffsetsResponse response = buildListOffsetsResponse(topicResponses); return new ClientResponse( new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1), request.callback(), "-1", time.milliseconds(), time.milliseconds(), - false, - null, + disconnected, null, + authenticationException, response ); } - private ListOffsetsResponse buildListOffsetsResponse(Map partitionsOffsets, - Map partitionsErrors) { - List offsetsTopicResponses = new ArrayList<>(); - // Generate successful responses - partitionsOffsets.forEach((tp, offset) -> offsetsTopicResponses.add( - ListOffsetsResponse.singletonListOffsetsTopicResponse(tp, Errors.NONE, -1L, - offset, 123))); - - // Generate responses with error - partitionsErrors.forEach((tp, error) -> offsetsTopicResponses.add( - ListOffsetsResponse.singletonListOffsetsTopicResponse( - tp, error, - ListOffsetsResponse.UNKNOWN_TIMESTAMP, - ListOffsetsResponse.UNKNOWN_OFFSET, - ListOffsetsResponse.UNKNOWN_EPOCH))); - + private ListOffsetsResponse buildListOffsetsResponse( + List offsetsTopicResponses) { ListOffsetsResponseData responseData = new ListOffsetsResponseData() .setThrottleTimeMs(0) .setTopics(offsetsTopicResponses);