diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 83672fbc080ad..64fc41d3f9e1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; @@ -129,7 +128,7 @@ public void maybeAutoCommit(final Map offsets * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an * {@link OffsetCommitRequestState} and enqueue it to send later. */ - public CompletableFuture addOffsetCommitRequest(final Map offsets) { + public CompletableFuture addOffsetCommitRequest(final Map offsets) { return pendingRequests.addOffsetCommitRequest(offsets); } @@ -145,19 +144,13 @@ public void updateAutoCommitTimer(final long currentTimeMs) { this.autoCommitState.ifPresent(t -> t.ack(currentTimeMs)); } - - // Visible for testing - List unsentOffsetFetchRequests() { - return pendingRequests.unsentOffsetFetches; - } - // Visible for testing Queue unsentOffsetCommitRequests() { return pendingRequests.unsentOffsetCommits; } // Visible for testing - CompletableFuture sendAutoCommit(final Map allConsumedOffsets) { + CompletableFuture sendAutoCommit(final Map allConsumedOffsets) { log.debug("Enqueuing autocommit offsets: {}", allConsumedOffsets); return this.addOffsetCommitRequest(allConsumedOffsets) .whenComplete((response, throwable) -> { @@ -182,23 +175,19 @@ private class OffsetCommitRequestState { private final String groupId; private final GroupState.Generation generation; private final String groupInstanceId; - private final NetworkClientDelegate.FutureCompletionHandler future; + private final CompletableFuture future; public OffsetCommitRequestState(final Map offsets, final String groupId, final String groupInstanceId, final GroupState.Generation generation) { this.offsets = offsets; - this.future = new NetworkClientDelegate.FutureCompletionHandler(); + this.future = new CompletableFuture<>(); this.groupId = groupId; this.generation = generation; this.groupInstanceId = groupInstanceId; } - public CompletableFuture future() { - return future.future(); - } - public NetworkClientDelegate.UnsentRequest toUnsentRequest() { Map requestTopicDataMap = new HashMap<>(); for (Map.Entry entry : offsets.entrySet()) { @@ -230,15 +219,20 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { return new NetworkClientDelegate.UnsentRequest( builder, coordinatorRequestManager.coordinator(), - future); + (response, throwable) -> { + if (throwable == null) { + future.complete(null); + } else { + future.completeExceptionally(throwable); + } + }); } } private class OffsetFetchRequestState extends RequestState { public final Set requestedPartitions; public final GroupState.Generation requestedGeneration; - public CompletableFuture> future; - + private final CompletableFuture> future; public OffsetFetchRequestState(final Set partitions, final GroupState.Generation generation, final long retryBackoffMs, @@ -253,19 +247,16 @@ public boolean sameRequest(final OffsetFetchRequestState request) { return Objects.equals(requestedGeneration, request.requestedGeneration) && requestedPartitions.equals(request.requestedPartitions); } - public NetworkClientDelegate.UnsentRequest toUnsentRequest(final long currentTimeMs) { + public NetworkClientDelegate.UnsentRequest toUnsentRequest() { OffsetFetchRequest.Builder builder = new OffsetFetchRequest.Builder( groupState.groupId, true, new ArrayList<>(this.requestedPartitions), throwOnFetchStableOffsetUnsupported); - NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest( + return new NetworkClientDelegate.UnsentRequest( builder, - coordinatorRequestManager.coordinator()); - unsentRequest.future().whenComplete((r, t) -> { - onResponse(currentTimeMs, (OffsetFetchResponse) r.responseBody()); - }); - return unsentRequest; + coordinatorRequestManager.coordinator(), + (r, t) -> onResponse(r.receivedTimeMs(), (OffsetFetchResponse) r.responseBody())); } public void onResponse( @@ -359,12 +350,12 @@ private void onSuccess(final long currentTimeMs, } } - private CompletableFuture> chainFuture(final CompletableFuture> future) { + private CompletableFuture> chainFuture(final CompletableFuture> otherFuture) { return this.future.whenComplete((r, t) -> { if (t != null) { - future.completeExceptionally(t); + otherFuture.completeExceptionally(t); } else { - future.complete(r); + otherFuture.complete(r); } }); } @@ -384,8 +375,8 @@ public String toString() { *

This is used to stage the unsent {@link OffsetCommitRequestState} and {@link OffsetFetchRequestState}. *

  • unsentOffsetCommits holds the offset commit requests that have not been sent out *
  • unsentOffsetFetches holds the offset fetch requests that have not been sent out
  • - *
  • inflightOffsetFetches holds the offset fetch requests that have been sent out but incompleted. - * + *
  • inflightOffsetFetches holds the offset fetch requests that have been sent out but not completed. + *

    * {@code addOffsetFetchRequest} dedupes the requests to avoid sending the same requests. */ @@ -395,11 +386,12 @@ class PendingRequests { List unsentOffsetFetches = new ArrayList<>(); List inflightOffsetFetches = new ArrayList<>(); - public boolean hasUnsentRequests() { + // Visible for testing + boolean hasUnsentRequests() { return !unsentOffsetCommits.isEmpty() || !unsentOffsetFetches.isEmpty(); } - public CompletableFuture addOffsetCommitRequest(final Map offsets) { + CompletableFuture addOffsetCommitRequest(final Map offsets) { // TODO: Dedupe committing the same offsets to the same partitions OffsetCommitRequestState request = new OffsetCommitRequestState( offsets, @@ -407,15 +399,15 @@ public CompletableFuture addOffsetCommitRequest(final MapAdding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future - * to the existing one. + *

    Adding an offset fetch request to the outgoing buffer. If the same request was made, we chain the future + * to the existing one. * - *

    If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} - * upon completion. + *

    If the request is new, it invokes a callback to remove itself from the {@code inflightOffsetFetches} + * upon completion. */ private CompletableFuture> addOffsetFetchRequest(final OffsetFetchRequestState request) { Optional dupe = @@ -450,12 +442,11 @@ private CompletableFuture> addOffsetFetch /** * Clear {@code unsentOffsetCommits} and moves all the sendable request in {@code unsentOffsetFetches} to the - * {@code inflightOffsetFetches} to bookkeep all of the inflight requests. - * + * {@code inflightOffsetFetches} to bookkeep all the inflight requests. * Note: Sendable requests are determined by their timer as we are expecting backoff on failed attempt. See * {@link RequestState}. **/ - public List drain(final long currentTimeMs) { + List drain(final long currentTimeMs) { List unsentRequests = new ArrayList<>(); // Add all unsent offset commit requests to the unsentRequests list @@ -472,7 +463,7 @@ public List drain(final long currentTimeMs) // Add all sendable offset fetch requests to the unsentRequests list and to the inflightOffsetFetches list for (OffsetFetchRequestState request : partitionedBySendability.get(true)) { request.onSendAttempt(currentTimeMs); - unsentRequests.add(request.toUnsentRequest(currentTimeMs)); + unsentRequests.add(request.toUnsentRequest()); inflightOffsetFetches.add(request); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 241760d4c22dd..c119a6198929c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -38,8 +38,8 @@ * Whether there is an existing coordinator. * Whether there is an inflight request. * Whether the backoff timer has expired. - * The {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} contains either a wait timer - * or a singleton list of {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}. + * The {@link NetworkClientDelegate.PollResult} contains either a wait timer + * or a singleton list of {@link NetworkClientDelegate.UnsentRequest}. *

    * The {@link FindCoordinatorRequest} will be handled by the {@link #onResponse(long, FindCoordinatorResponse)} callback, which * subsequently invokes {@code onResponse} to handle the exception and response. Note that the coordinator node will be @@ -86,7 +86,7 @@ public CoordinatorRequestManager( * Note that this method does not involve any actual network IO, and it only determines if we need to send a new request or not. * * @param currentTimeMs current time in ms. - * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult}. This will not be {@code null}. + * @return {@link NetworkClientDelegate.PollResult}. This will not be {@code null}. */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java index 9b4cd89361fdf..8150f56debbe4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java @@ -77,6 +77,7 @@ public class DefaultBackgroundThread extends KafkaThread { private final RequestManagers requestManagers; // Visible for testing + @SuppressWarnings("ParameterNumber") DefaultBackgroundThread(final Time time, final ConsumerConfig config, final LogContext logContext, @@ -89,7 +90,8 @@ public class DefaultBackgroundThread extends KafkaThread { final GroupState groupState, final CoordinatorRequestManager coordinatorManager, final CommitRequestManager commitRequestManager, - final OffsetsRequestManager offsetsRequestManager) { + final OffsetsRequestManager offsetsRequestManager, + final TopicMetadataRequestManager topicMetadataRequestManager) { super(BACKGROUND_THREAD_NAME, true); this.time = time; this.running = true; @@ -102,9 +104,9 @@ public class DefaultBackgroundThread extends KafkaThread { this.networkClientDelegate = networkClient; this.errorEventHandler = errorEventHandler; this.groupState = groupState; - this.requestManagers = new RequestManagers( offsetsRequestManager, + topicMetadataRequestManager, Optional.ofNullable(coordinatorManager), Optional.ofNullable(commitRequestManager)); } @@ -169,6 +171,9 @@ public DefaultBackgroundThread(final Time time, logContext); CoordinatorRequestManager coordinatorRequestManager = null; CommitRequestManager commitRequestManager = null; + TopicMetadataRequestManager topicMetadataRequestManger = new TopicMetadataRequestManager( + logContext, + config); if (groupState.groupId != null) { coordinatorRequestManager = new CoordinatorRequestManager( @@ -188,15 +193,14 @@ public DefaultBackgroundThread(final Time time, } this.requestManagers = new RequestManagers( - offsetsRequestManager, - Optional.ofNullable(coordinatorRequestManager), - Optional.ofNullable(commitRequestManager)); - + offsetsRequestManager, + topicMetadataRequestManger, + Optional.ofNullable(coordinatorRequestManager), + Optional.ofNullable(commitRequestManager)); this.applicationEventProcessor = new ApplicationEventProcessor( - backgroundEventQueue, - requestManagers, - metadata); - + backgroundEventQueue, + requestManagers, + metadata); } catch (final Exception e) { close(); throw new KafkaException("Failed to construct background processor", e.getCause()); @@ -217,7 +221,7 @@ public void run() { } } catch (final Throwable t) { log.error("The background thread failed due to unexpected error", t); - throw new RuntimeException(t); + throw new KafkaException(t); } finally { close(); log.debug("{} closed", getClass()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 445005bf54552..24b6b942481fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -43,6 +43,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; /** * A wrapper around the {@link org.apache.kafka.clients.NetworkClient} to handle network poll and send operations. @@ -211,17 +212,19 @@ public static class UnsentRequest { private Optional node; // empty if random node can be chosen private Timer timer; - public UnsentRequest(final AbstractRequest.Builder requestBuilder, final Optional node) { - this(requestBuilder, node, new FutureCompletionHandler()); - } - public UnsentRequest(final AbstractRequest.Builder requestBuilder, - final Optional node, - final FutureCompletionHandler handler) { + final Optional node) { Objects.requireNonNull(requestBuilder); this.requestBuilder = requestBuilder; this.node = node; - this.handler = handler; + this.handler = new FutureCompletionHandler(); + } + + public UnsentRequest(final AbstractRequest.Builder requestBuilder, + final Optional node, + final BiConsumer callback) { + this(requestBuilder, node); + this.handler.future.whenComplete(callback); } public void setTimer(final Time time, final long requestTimeoutMs) { @@ -263,10 +266,6 @@ public void onFailure(final RuntimeException e) { future.completeExceptionally(e); } - public CompletableFuture future() { - return future; - } - @Override public void onComplete(final ClientResponse response) { if (response.authenticationException() != null) { @@ -280,5 +279,4 @@ public void onComplete(final ClientResponse response) { } } } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index a846f8c2bb2ce..3a4420a556ba2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -121,7 +121,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, /** * Determine if there are pending fetch offsets requests to be sent and build a - * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} + * {@link NetworkClientDelegate.PollResult} * containing it. */ @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java index 40b411b6dd576..4fe8fe80c44c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java @@ -306,7 +306,6 @@ public void commitAsync(Map offsets, OffsetCo commitCallback.onComplete(offsets, null); } }).exceptionally(e -> { - System.out.println(e); throw new KafkaException(e); }); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 0df55a8618428..3864b1fcaa6f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -33,20 +33,24 @@ public class RequestManagers { public final Optional coordinatorRequestManager; public final Optional commitRequestManager; public final OffsetsRequestManager offsetsRequestManager; + public final TopicMetadataRequestManager topicMetadataRequestManager; private final List> entries; public RequestManagers(OffsetsRequestManager offsetsRequestManager, + TopicMetadataRequestManager topicMetadataRequestManager, Optional coordinatorRequestManager, Optional commitRequestManager) { this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null"); this.coordinatorRequestManager = coordinatorRequestManager; this.commitRequestManager = commitRequestManager; + this.topicMetadataRequestManager = topicMetadataRequestManager; List> list = new ArrayList<>(); list.add(coordinatorRequestManager); list.add(commitRequestManager); list.add(Optional.of(offsetsRequestManager)); + list.add(Optional.of(topicMetadataRequestManager)); entries = Collections.unmodifiableList(list); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java new file mode 100644 index 0000000000000..2b0cbf5dcb0ff --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + *

    + * Manages the state of topic metadata requests. This manager returns a + * {@link NetworkClientDelegate.PollResult} when a request is ready to + * be sent. Specifically, this manager handles the following user API calls: + *

    + *
      + *
    • listTopics
    • + *
    • partitionsFor
    • + *
    + *

    + * The manager checks the state of the {@link TopicMetadataRequestState} before sending a new one to + * prevent sending it without backing off from previous attempts. + * It also checks the state of inflight requests to avoid overwhelming the broker with duplicate requests. + * The {@code inflightRequests} are memorized by topic name. If all topics are requested, then we use {@code Optional + * .empty()} as the key. + * Once a request is completed successfully, its corresponding entry is removed. + *

    + */ + +public class TopicMetadataRequestManager implements RequestManager { + private final boolean allowAutoTopicCreation; + private final Map, TopicMetadataRequestState> inflightRequests; + private final long retryBackoffMs; + private final long retryBackoffMaxMs; + private final Logger log; + private final LogContext logContext; + + public TopicMetadataRequestManager(final LogContext context, final ConsumerConfig config) { + logContext = context; + log = logContext.logger(getClass()); + inflightRequests = new HashMap<>(); + retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); + allowAutoTopicCreation = config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG); + } + + @Override + public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { + List requests = inflightRequests.values().stream() + .map(req -> req.send(currentTimeMs)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + return requests.isEmpty() ? + new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()) : + new NetworkClientDelegate.PollResult(0, Collections.unmodifiableList(requests)); + } + + /** + * return the future of the metadata request. Return the existing future if a request for the same topic is already + * inflight. + * + * @param topic to be requested. If empty, return the metadata for all topics. + * @return the future of the metadata request. + */ + public CompletableFuture>> requestTopicMetadata(final Optional topic) { + if (inflightRequests.containsKey(topic)) { + return inflightRequests.get(topic).future; + } + + TopicMetadataRequestState newRequest = new TopicMetadataRequestState( + logContext, + topic, + retryBackoffMs, + retryBackoffMaxMs); + inflightRequests.put(topic, newRequest); + return newRequest.future; + } + + // Visible for testing + List inflightRequests() { + return new ArrayList<>(inflightRequests.values()); + } + + class TopicMetadataRequestState extends RequestState { + private final Optional topic; + CompletableFuture>> future; + + public TopicMetadataRequestState(final LogContext logContext, + final Optional topic, + final long retryBackoffMs, + final long retryBackoffMaxMs) { + super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs, + retryBackoffMaxMs); + future = new CompletableFuture<>(); + this.topic = topic; + } + + /** + * prepare the metadata request and return an + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} if needed. + */ + private Optional send(final long currentTimeMs) { + if (!canSendRequest(currentTimeMs)) { + return Optional.empty(); + } + onSendAttempt(currentTimeMs); + + final MetadataRequest.Builder request = + topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), allowAutoTopicCreation)) + .orElseGet(MetadataRequest.Builder::allTopics); + + return Optional.of(createUnsentRequest(request)); + } + + private NetworkClientDelegate.UnsentRequest createUnsentRequest( + final MetadataRequest.Builder request) { + return new NetworkClientDelegate.UnsentRequest( + request, + Optional.empty(), + this::processResponseOrException + ); + } + + private void processResponseOrException(final ClientResponse response, + final Throwable exception) { + if (exception == null) { + handleResponse(response, response.receivedTimeMs()); + return; + } + + if (exception instanceof RetriableException) { + // We continue to retry on RetriableException + // TODO: TimeoutException will continue to retry despite user API timeout. + onFailedAttempt(response.receivedTimeMs()); + } else { + completeFutureAndRemoveRequest(new KafkaException(exception)); + } + } + + private void handleResponse(final ClientResponse response, final long responseTimeMs) { + try { + Map> res = handleTopicMetadataResponse((MetadataResponse) response.responseBody()); + future.complete(res); + inflightRequests.remove(topic); + } catch (RetriableException e) { + onFailedAttempt(responseTimeMs); + } catch (Exception t) { + completeFutureAndRemoveRequest(t); + } + } + + private void completeFutureAndRemoveRequest(final Throwable throwable) { + future.completeExceptionally(throwable); + inflightRequests.remove(topic); + } + + private Map> handleTopicMetadataResponse(final MetadataResponse response) { + Cluster cluster = response.buildCluster(); + + final Set unauthorizedTopics = cluster.unauthorizedTopics(); + if (!unauthorizedTopics.isEmpty()) + throw new TopicAuthorizationException(unauthorizedTopics); + + Map errors = response.errors(); + if (!errors.isEmpty()) { + // if there were errors, we need to check whether they were fatal or whether + // we should just retry + + log.debug("Topic metadata fetch included errors: {}", errors); + + for (Map.Entry errorEntry : errors.entrySet()) { + String topic = errorEntry.getKey(); + Errors error = errorEntry.getValue(); + + if (error == Errors.INVALID_TOPIC_EXCEPTION) + throw new InvalidTopicException("Topic '" + topic + "' is invalid"); + else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) + // if a requested topic is unknown, we just continue and let it be absent + // in the returned map + continue; + else if (error.exception() instanceof RetriableException) { + throw error.exception(); + } else + throw new KafkaException("Unexpected error fetching metadata for topic " + topic, + error.exception()); + } + } + + HashMap> topicsPartitionInfos = new HashMap<>(); + for (String topic : cluster.topics()) + topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic)); + return topicsPartitionInfos; + } + + public Optional topic() { + return topic; + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 65ba01959cd57..bf9bb1d496217 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -25,7 +25,7 @@ public abstract class ApplicationEvent { public enum Type { NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, - LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, + LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 234a228ba4f7b..f2f558600044a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -21,10 +21,13 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -59,6 +62,8 @@ public boolean process(final ApplicationEvent event) { return process((NewTopicsMetadataUpdateRequestEvent) event); case ASSIGNMENT_CHANGE: return process((AssignmentChangeApplicationEvent) event); + case TOPIC_METADATA: + return process((TopicMetadataApplicationEvent) event); case LIST_OFFSETS: return process((ListOffsetsApplicationEvent) event); case RESET_POSITIONS: @@ -100,13 +105,7 @@ private boolean process(final CommitApplicationEvent event) { } CommitRequestManager manager = requestManagers.commitRequestManager.get(); - manager.addOffsetCommitRequest(event.offsets()).whenComplete((r, e) -> { - if (e != null) { - event.future().completeExceptionally(e); - return; - } - event.future().complete(null); - }); + event.chain(manager.addOffsetCommitRequest(event.offsets())); return true; } @@ -153,4 +152,11 @@ private boolean processValidatePositionsEvent() { requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); return true; } + + private boolean process(final TopicMetadataApplicationEvent event) { + final CompletableFuture>> future = + this.requestManagers.topicMetadataRequestManager.requestTopicMetadata(Optional.of(event.topic())); + event.chain(future); + return true; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java index 5f9bad09326ce..48e62fe78bfae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java @@ -29,6 +29,7 @@ public class CommitApplicationEvent extends CompletableApplicationEvent { public CommitApplicationEvent(final Map offsets) { super(Type.COMMIT); this.offsets = Collections.unmodifiableMap(offsets); + for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { if (offsetAndMetadata.offset() < 0) { throw new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java new file mode 100644 index 0000000000000..6486fe60c47a3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.Map; + +public class TopicMetadataApplicationEvent extends CompletableApplicationEvent>> { + private final String topic; + public TopicMetadataApplicationEvent(final String topic) { + super(Type.TOPIC_METADATA); + this.topic = topic; + } + + public String topic() { + return topic; + } + + @Override + public String toString() { + return "TopicMetadataApplicationEvent(topic=" + topic + ")"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TopicMetadataApplicationEvent)) return false; + if (!super.equals(o)) return false; + + TopicMetadataApplicationEvent that = (TopicMetadataApplicationEvent) o; + + return topic.equals(that.topic); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + topic.hashCode(); + return result; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 66e6ab760d224..8b2e08bd5207a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -134,7 +134,7 @@ public void testPoll_EnsureCorrectInflightRequestBufferSize() { offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L)); // Add the requests to the CommitRequestManager and store their futures - ArrayList> commitFutures = new ArrayList<>(); + ArrayList> commitFutures = new ArrayList<>(); ArrayList>> fetchFutures = new ArrayList<>(); commitFutures.add(commitManager.addOffsetCommitRequest(offsets1)); fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 0)))); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java index b5a1ced617ab1..137bd106d6ece 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThreadTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -68,6 +69,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("ClassDataAbstractionCoupling") public class DefaultBackgroundThreadTest { private static final long RETRY_BACKOFF_MS = 100; private final Properties properties = new Properties(); @@ -83,6 +85,7 @@ public class DefaultBackgroundThreadTest { private final int requestTimeoutMs = 500; private GroupState groupState; private CommitRequestManager commitManager; + private TopicMetadataRequestManager topicMetadataRequestManager; @BeforeEach @SuppressWarnings("unchecked") @@ -107,6 +110,7 @@ public void setup() { true); this.groupState = new GroupState(rebalanceConfig); this.commitManager = mock(CommitRequestManager.class); + this.topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); } @Test @@ -114,6 +118,7 @@ public void testStartupAndTearDown() throws InterruptedException { when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); backgroundThread.start(); TestUtils.waitForCondition(backgroundThread::isRunning, "Failed awaiting for the background thread to be running"); @@ -125,9 +130,10 @@ public void testStartupAndTearDown() throws InterruptedException { public void testApplicationEvent() { this.applicationEventsQueue = new LinkedBlockingQueue<>(); this.backgroundEventsQueue = new LinkedBlockingQueue<>(); - when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); - when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); - when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); ApplicationEvent e = new NoopApplicationEvent("noop event"); this.applicationEventsQueue.add(e); @@ -144,9 +150,10 @@ public void testMetadataUpdateEvent() { this.backgroundEventsQueue, mockRequestManagers(), metadata); - when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); - when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); - when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent(); this.applicationEventsQueue.add(e); @@ -159,9 +166,10 @@ public void testMetadataUpdateEvent() { public void testCommitEvent() { this.applicationEventsQueue = new LinkedBlockingQueue<>(); this.backgroundEventsQueue = new LinkedBlockingQueue<>(); - when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); - when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); - when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); ApplicationEvent e = new CommitApplicationEvent(new HashMap<>()); this.applicationEventsQueue.add(e); @@ -176,6 +184,7 @@ public void testListOffsetsEventIsProcessed() { when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); this.applicationEventsQueue = new LinkedBlockingQueue<>(); this.backgroundEventsQueue = new LinkedBlockingQueue<>(); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); @@ -193,6 +202,7 @@ public void testResetPositionsEventIsProcessed() { when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); this.applicationEventsQueue = new LinkedBlockingQueue<>(); this.backgroundEventsQueue = new LinkedBlockingQueue<>(); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); @@ -233,6 +243,7 @@ public void testValidatePositionsEventIsProcessed() { when(coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); when(commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); when(offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); this.applicationEventsQueue = new LinkedBlockingQueue<>(); this.backgroundEventsQueue = new LinkedBlockingQueue<>(); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); @@ -275,7 +286,7 @@ public void testAssignmentChangeEvent() { this.applicationEventProcessor = spy(new ApplicationEventProcessor( this.backgroundEventsQueue, mockRequestManagers(), - metadata)); + metadata)); DefaultBackgroundThread backgroundThread = mockBackgroundThread(); HashMap offset = mockTopicPartitionOffset(); @@ -287,6 +298,7 @@ public void testAssignmentChangeEvent() { when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); backgroundThread.runOnce(); verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class)); @@ -303,12 +315,27 @@ void testFindCoordinator() { when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); backgroundThread.runOnce(); Mockito.verify(coordinatorManager, times(1)).poll(anyLong()); Mockito.verify(networkClient, times(1)).poll(anyLong(), anyLong()); backgroundThread.close(); } + @Test + void testFetchTopicMetadata() { + this.applicationEventsQueue = new LinkedBlockingQueue<>(); + DefaultBackgroundThread backgroundThread = mockBackgroundThread(); + when(this.coordinatorManager.poll(anyLong())).thenReturn(mockPollCoordinatorResult()); + when(this.commitManager.poll(anyLong())).thenReturn(mockPollCommitResult()); + when(this.offsetsRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + when(this.topicMetadataRequestManager.poll(anyLong())).thenReturn(emptyPollOffsetsRequestResult()); + this.applicationEventsQueue.add(new TopicMetadataApplicationEvent("topic")); + backgroundThread.runOnce(); + verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class)); + backgroundThread.close(); + } + @Test void testPollResultTimer() { DefaultBackgroundThread backgroundThread = mockBackgroundThread(); @@ -336,6 +363,7 @@ private HashMap mockTopicPartitionOffset() { private RequestManagers mockRequestManagers() { return new RequestManagers( offsetsRequestManager, + topicMetadataRequestManager, Optional.of(coordinatorManager), Optional.of(commitManager)); } @@ -372,19 +400,20 @@ private DefaultBackgroundThread mockBackgroundThread() { this.groupState, this.coordinatorManager, this.commitManager, - this.offsetsRequestManager); + this.offsetsRequestManager, + this.topicMetadataRequestManager); } private NetworkClientDelegate.PollResult mockPollCoordinatorResult() { return new NetworkClientDelegate.PollResult( - RETRY_BACKOFF_MS, - Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); + RETRY_BACKOFF_MS, + Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); } private NetworkClientDelegate.PollResult mockPollCommitResult() { return new NetworkClientDelegate.PollResult( - RETRY_BACKOFF_MS, - Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); + RETRY_BACKOFF_MS, + Collections.singletonList(findCoordinatorUnsentRequest(time, requestTimeoutMs))); } private NetworkClientDelegate.PollResult emptyPollOffsetsRequestResult() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java new file mode 100644 index 0000000000000..7f0fcd84c43e6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.spy; + +public class TopicMetadataRequestManagerTest { + private MockTime time; + private TopicMetadataRequestManager topicMetadataRequestManager; + + @BeforeEach + public void setup() { + this.time = new MockTime(); + Properties props = new Properties(); + props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 100); + props.put(ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager( + new LogContext(), + new ConsumerConfig(props))); + } + + @ParameterizedTest + @MethodSource("topicsProvider") + public void testPoll_SuccessfulRequestTopicMetadata(Optional topic) { + this.topicMetadataRequestManager.requestTopicMetadata(topic); + this.time.sleep(100); + NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); + assertEquals(1, res.unsentRequests.size()); + } + + @ParameterizedTest + @MethodSource("exceptionProvider") + public void testExceptionAndInflightRequests(final Errors error, final boolean shouldRetry) { + String topic = "hello"; + this.topicMetadataRequestManager.requestTopicMetadata(Optional.of("hello")); + this.time.sleep(100); + NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); + res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse( + res.unsentRequests.get(0), + Optional.of(topic), + error)); + List inflights = this.topicMetadataRequestManager.inflightRequests(); + + if (shouldRetry) { + assertEquals(1, inflights.size()); + assertEquals(topic, inflights.get(0).topic().orElse(null)); + } else { + assertEquals(0, inflights.size()); + } + } + + @ParameterizedTest + @MethodSource("topicsProvider") + public void testSendingTheSameRequest(Optional topic) { + CompletableFuture>> future = this.topicMetadataRequestManager.requestTopicMetadata(topic); + CompletableFuture>> future2 = + this.topicMetadataRequestManager.requestTopicMetadata(topic); + this.time.sleep(100); + NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); + assertEquals(1, res.unsentRequests.size()); + + res.unsentRequests.get(0).future().complete(buildTopicMetadataClientResponse( + res.unsentRequests.get(0), + topic, + Errors.NONE)); + + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + try { + future.get(); + } catch (Throwable e) { + fail("Expecting to succeed, but got: {}", e); + } + assertTrue(future2.isDone()); + assertFalse(future2.isCompletedExceptionally()); + } + + @ParameterizedTest + @MethodSource("hardFailureExceptionProvider") + void testHardFailures(Exception exception) { + Optional topic = Optional.of("hello"); + + this.topicMetadataRequestManager.requestTopicMetadata(topic); + NetworkClientDelegate.PollResult res = this.topicMetadataRequestManager.poll(this.time.milliseconds()); + assertEquals(1, res.unsentRequests.size()); + + res.unsentRequests.get(0).future().completeExceptionally(exception); + + if (exception instanceof RetriableException) { + assertFalse(topicMetadataRequestManager.inflightRequests().isEmpty()); + } else { + assertTrue(topicMetadataRequestManager.inflightRequests().isEmpty()); + } + } + + private ClientResponse buildTopicMetadataClientResponse( + final NetworkClientDelegate.UnsentRequest request, + final Optional topic, + final Errors error) { + AbstractRequest abstractRequest = request.requestBuilder().build(); + assertTrue(abstractRequest instanceof MetadataRequest); + MetadataRequest metadataRequest = (MetadataRequest) abstractRequest; + Cluster cluster = mockCluster(3, 0); + List topics = new ArrayList<>(); + if (topic.isPresent()) { + topics.add(new MetadataResponse.TopicMetadata(error, topic.get(), false, + Collections.emptyList())); + } else { + // null topic means request for all topics + topics.add(new MetadataResponse.TopicMetadata(error, "topic1", false, + Collections.emptyList())); + topics.add(new MetadataResponse.TopicMetadata(error, "topic2", false, + Collections.emptyList())); + } + final MetadataResponse metadataResponse = RequestTestUtils.metadataResponse(cluster.nodes(), + cluster.clusterResource().clusterId(), + cluster.controller().id(), + topics); + return new ClientResponse( + new RequestHeader(ApiKeys.METADATA, metadataRequest.version(), "mockClientId", 1), + request.callback(), + "-1", + time.milliseconds(), + time.milliseconds(), + false, + null, + null, + metadataResponse); + } + + private static Cluster mockCluster(final int numNodes, final int controllerIndex) { + HashMap nodes = new HashMap<>(); + for (int i = 0; i < numNodes; i++) + nodes.put(i, new Node(i, "localhost", 8121 + i)); + return new Cluster("mockClusterId", nodes.values(), + Collections.emptySet(), Collections.emptySet(), + Collections.emptySet(), nodes.get(controllerIndex)); + } + + + private static Collection topicsProvider() { + return Arrays.asList( + Arguments.of(Optional.of("topic1")), + Arguments.of(Optional.empty())); + } + + private static Collection exceptionProvider() { + return Arrays.asList( + Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false), + Arguments.of(Errors.INVALID_TOPIC_EXCEPTION, false), + Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false), + Arguments.of(Errors.NETWORK_EXCEPTION, true), + Arguments.of(Errors.NONE, false)); + } + + private static Collection hardFailureExceptionProvider() { + return Arrays.asList( + Arguments.of(new TimeoutException("timeout")), + Arguments.of(new KafkaException("non-retriable exception")), + Arguments.of(new NetworkException("retriable-exception"))); + } + +} diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index d8f2144b3e353..5d1d7dc5ec793 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -1314,7 +1314,8 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } } - private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + // Visible for testing + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); } @@ -1398,7 +1399,8 @@ private Optional findNextSegmentMetadata(RemoteLogSegm : Optional.empty(); } - private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + // Visible for testing + RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { RecordBatch nextBatch; // Look for the batch which has the desired offset // We will always have a batch in that segment as it is a non-compacted topic. diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index bb66994b273b4..5c0578d359faf 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -34,7 +34,10 @@ import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RemoteLogInputStream; import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -58,15 +61,20 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.EpochEntry; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.FetchIsolation; import org.apache.kafka.storage.internals.log.LazyIndex; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.OffsetIndex; import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.internals.log.TimeIndex; import org.apache.kafka.storage.internals.log.TransactionIndex; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.MockedConstruction; @@ -76,7 +84,9 @@ import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; @@ -89,6 +99,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.Set; import java.util.TreeMap; @@ -121,6 +132,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -1731,6 +1743,146 @@ private Map truncateAndGetLeaderEpochs(List entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + int fetchOffset = 0; + int fetchMaxBytes = 10; + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + // This is the key scenario that we are testing here + RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + assertEquals(fetchOffset, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertFalse(fetchDataInfo.firstEntryIncomplete); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + // FetchIsolation is TXN_COMMITTED + assertTrue(fetchDataInfo.abortedTransactions.isPresent()); + assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + int fetchOffset = 0; + int fetchMaxBytes = 10; + int recordBatchSizeInBytes = fetchMaxBytes + 1; + RecordBatch firstBatch = mock(RecordBatch.class); + ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, minOneMessage, tp, partitionData, FetchIsolation.HIGH_WATERMARK, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes); + doNothing().when(firstBatch).writeTo(capture.capture()); + return firstBatch; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + // Common assertions + assertEquals(fetchOffset, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertFalse(fetchDataInfo.firstEntryIncomplete); + // FetchIsolation is HIGH_WATERMARK + assertEquals(Optional.empty(), fetchDataInfo.abortedTransactions); + + + if (minOneMessage) { + // Verify that the byte buffer has capacity equal to the size of the first batch + assertEquals(recordBatchSizeInBytes, capture.getValue().capacity()); + } else { + // Verify that the first batch is never written to the buffer + verify(firstBatch, never()).writeTo(any(ByteBuffer.class)); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + } + } + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class); diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 0d1106db05f14..9c6a2089db4a8 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -17,7 +17,6 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.RecordMetadata; @@ -378,7 +377,7 @@ private void initializeResources() { long startTimeMs = time.milliseconds(); Admin adminClient = null; try { - adminClient = AdminClient.create(rlmmConfig.commonProperties()); + adminClient = Admin.create(rlmmConfig.commonProperties()); // Stop if it is already initialized or closing. while (!(initialized.get() || closing.get())) { diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 2ff4eea53c1ee..ddd9f40b533fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -777,7 +777,7 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost"; public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and " + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + " controls whether the " - + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors."; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 78734b68bb397..f086b720e9384 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -757,7 +757,7 @@ public void shouldGoThroughRebalancingCorrectly() throws Exception { } private int getNumberOfPartitionsForTopic(final String topic) throws Exception { - try (final AdminClient adminClient = createAdminClient()) { + try (final Admin adminClient = createAdminClient()) { final TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topic)) .topicNameValues() .get(topic) @@ -768,7 +768,7 @@ private int getNumberOfPartitionsForTopic(final String topic) throws Exception { } private boolean topicExists(final String topic) throws Exception { - try (final AdminClient adminClient = createAdminClient()) { + try (final Admin adminClient = createAdminClient()) { final Set topics = adminClient.listTopics() .names() .get(); @@ -781,11 +781,11 @@ private String toRepartitionTopicName(final String input) { return applicationId + "-" + input + "-repartition"; } - private static AdminClient createAdminClient() { + private static Admin createAdminClient() { final Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - return AdminClient.create(properties); + return Admin.create(properties); } private static int countOccurrencesInTopology(final String topologyString, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index 38dc3a6ffc1f1..282ceeed0ecfe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -206,7 +206,7 @@ private KafkaStreams startStreams(final StreamsBuilder builder) throws Interrupt } private int getNumberOfPartitionsForTopic(final String topic) throws Exception { - try (final AdminClient adminClient = createAdminClient()) { + try (final Admin adminClient = createAdminClient()) { final TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topic)) .topicNameValues() .get(topic) @@ -262,10 +262,10 @@ private void validateReceivedMessages(final String topic, ); } - private static AdminClient createAdminClient() { + private static Admin createAdminClient() { final Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - return AdminClient.create(properties); + return Admin.create(properties); } }