From 6744a718c2c177c7d462b231ea5e476d98f6eb38 Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Sun, 15 Sep 2024 13:43:45 -0400 Subject: [PATCH] KAFKA-17066 new consumer updateFetchPositions all in background thread (#16885) Fix for the known issue that the logic for updating fetch positions in the new consumer was being performed partly in the app thread, party in the background thread, potentially leading to race conditions on the subscription state. This PR moves the logic for updateFetchPositions to the background thread as a single event (instead of triggering separate events to validate, fetchOffsets, listOffsets). A new UpdateFetchPositionsEvent is triggered from the app thread and processed in the background, where it performs those same operations and updates the subscription state accordingly, without blocking the background thread. This PR maintains the existing logic for keeping a pendingOffsetFetchRequest that does not complete within the lifetime of the updateFetchPositions attempt, and may be used on the next call to updateFetchPositions. Reviewers: Andrew Schofield , Chia-Ping Tsai --- .../internals/AsyncKafkaConsumer.java | 125 +------ .../consumer/internals/ConsumerUtils.java | 8 +- .../internals/OffsetsRequestManager.java | 325 ++++++++++++++++-- .../consumer/internals/RequestManagers.java | 34 +- .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventHandler.java | 7 + .../events/ApplicationEventProcessor.java | 21 +- ...java => CheckAndUpdatePositionsEvent.java} | 17 +- .../events/ValidatePositionsEvent.java | 30 -- .../internals/AsyncKafkaConsumerTest.java | 237 +------------ .../internals/OffsetsRequestManagerTest.java | 148 ++++++-- .../events/ApplicationEventProcessorTest.java | 14 +- 12 files changed, 491 insertions(+), 477 deletions(-) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{ResetPositionsEvent.java => CheckAndUpdatePositionsEvent.java} (52%) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 06ee3bc3616d2..b88a25f907b3b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -42,6 +42,7 @@ import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CommitEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; @@ -54,13 +55,11 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; -import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; -import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent; import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.Cluster; @@ -130,7 +129,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; @@ -254,8 +252,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); - private FetchCommittedOffsetsEvent pendingOffsetFetchEvent; - AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { @@ -1576,48 +1572,28 @@ private Fetch collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { + cachedSubscriptionHasAllFetchPositions = false; try { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); - - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; - - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) - return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); - return true; + CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer)); + wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future()); + cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); } catch (TimeoutException e) { return false; + } finally { + wakeupTrigger.clearTask(); } + return true; } /** @@ -1629,79 +1605,6 @@ private boolean isCommittedOffsetsManagementEnabled() { return groupMetadata.get().isPresent(); } - /** - * Refresh the committed offsets for partitions that require initialization. - * - * @param timer Timer bounding how long this method can block - * @return true iff the operation completed within the timeout - */ - private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { - final Set initializingPartitions = subscriptions.initializingPartitions(); - - if (initializingPartitions.isEmpty()) - return true; - - log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); - - // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle - // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created - // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the - // case it times out, subsequent attempts will also use the event in order to wait for the results. - if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { - // Give the event a reasonable amount of time to complete. - final long timeoutMs = Math.max(defaultApiTimeoutMs, timer.remainingMs()); - final long deadlineMs = calculateDeadlineMs(time, timeoutMs); - pendingOffsetFetchEvent = new FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs); - applicationEventHandler.add(pendingOffsetFetchEvent); - } - - final CompletableFuture> future = pendingOffsetFetchEvent.future(); - - try { - wakeupTrigger.setActiveTask(future); - final Map offsets = ConsumerUtils.getResult(future, timer); - - // Clear the pending event once its result is successfully retrieved. - pendingOffsetFetchEvent = null; - - refreshCommittedOffsets(offsets, metadata, subscriptions); - return true; - } catch (TimeoutException e) { - log.debug( - "The committed offsets for the following partition(s) could not be refreshed within the timeout: {} ", - initializingPartitions - ); - return false; - } catch (InterruptException e) { - throw e; - } catch (Throwable t) { - pendingOffsetFetchEvent = null; - throw ConsumerUtils.maybeWrapAsKafkaException(t); - } finally { - wakeupTrigger.clearTask(); - } - } - - /** - * This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse - * is only possible if all the following conditions are true: - * - *
    - *
  • A pending offset fetch event exists
  • - *
  • The partition set of the pending offset fetch event is the same as the given partition set
  • - *
  • The pending offset fetch event has not expired
  • - *
- */ - private boolean canReusePendingOffsetFetchEvent(Set partitions) { - if (pendingOffsetFetchEvent == null) - return false; - - if (!pendingOffsetFetchEvent.partitions().equals(partitions)) - return false; - - return pendingOffsetFetchEvent.deadlineMs() > time.milliseconds(); - } - private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { if (offsetAndMetadata != null) offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); @@ -2002,10 +1905,6 @@ SubscriptionState subscriptions() { return subscriptions; } - boolean hasPendingOffsetFetchEvent() { - return pendingOffsetFetchEvent != null; - } - private void maybeUpdateSubscriptionMetadata() { if (this.metadataVersionSnapshot < metadata.updateVersion()) { this.metadataVersionSnapshot = metadata.updateVersion(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index 89f3fefae2bb6..113b8e0b9d6df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -215,9 +215,9 @@ public static void refreshCommittedOffsets(final Map T getResult(Future future, Timer timer) { + public static T getResult(Future future, long timeoutMs) { try { - return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); + return future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { if (e.getCause() instanceof IllegalStateException) throw (IllegalStateException) e.getCause(); @@ -229,6 +229,10 @@ public static T getResult(Future future, Timer timer) { } } + public static T getResult(Future future, Timer timer) { + return getResult(future, timer.remainingMs()); + } + public static T getResult(Future future) { try { return future.get(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index c219f1d98e77b..7d8eea269a4b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -21,16 +21,17 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.StaleMetadataException; import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -48,14 +49,18 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets; import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion; import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader; @@ -86,7 +91,23 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis private final Time time; private final ApiVersions apiVersions; private final NetworkClientDelegate networkClientDelegate; - private final BackgroundEventHandler backgroundEventHandler; + private final CommitRequestManager commitRequestManager; + private final long defaultApiTimeoutMs; + + /** + * Exception that occurred while updating positions after the triggering event had already + * expired. It will be propagated and cleared on the next call to update fetch positions. + */ + private final AtomicReference cachedUpdatePositionsException = new AtomicReference<>(); + + /** + * This holds the last OffsetFetch request triggered to retrieve committed offsets to update + * fetch positions that hasn't completed yet. When a response is received, it's used to + * update the fetch positions and the pendingOffsetFetchEvent is cleared. If the update fetch + * positions attempt runs out of time before this OffsetFetch gets a response, it will be + * kept to be used on the next attempt to update fetch positions (if partitions remain the same) + */ + private PendingFetchCommittedRequest pendingOffsetFetchEvent; @SuppressWarnings("this-escape") public OffsetsRequestManager(final SubscriptionState subscriptionState, @@ -95,9 +116,10 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, final Time time, final long retryBackoffMs, final long requestTimeoutMs, + final long defaultApiTimeoutMs, final ApiVersions apiVersions, final NetworkClientDelegate networkClientDelegate, - final BackgroundEventHandler backgroundEventHandler, + final CommitRequestManager commitRequestManager, final LogContext logContext) { requireNonNull(subscriptionState); requireNonNull(metadata); @@ -105,7 +127,6 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, requireNonNull(time); requireNonNull(apiVersions); requireNonNull(networkClientDelegate); - requireNonNull(backgroundEventHandler); requireNonNull(logContext); this.metadata = metadata; @@ -116,15 +137,27 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, this.subscriptionState = subscriptionState; this.time = time; this.requestTimeoutMs = requestTimeoutMs; + this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.apiVersions = apiVersions; this.networkClientDelegate = networkClientDelegate; - this.backgroundEventHandler = backgroundEventHandler; this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState, time, retryBackoffMs, apiVersions); // Register the cluster metadata update callback. Note this only relies on the // requestsToRetry initialized above, and won't be invoked until all managers are // initialized and the network thread started. this.metadata.addClusterUpdateListener(this); + this.commitRequestManager = commitRequestManager; + } + + private static class PendingFetchCommittedRequest { + final Set requestedPartitions; + final CompletableFuture> result; + + private PendingFetchCommittedRequest(final Set requestedPartitions, + final CompletableFuture> result) { + this.requestedPartitions = Objects.requireNonNull(requestedPartitions); + this.result = Objects.requireNonNull(result); + } } /** @@ -182,26 +215,265 @@ public CompletableFuture> fetchO result.fetchedOffsets)); } + /** + * Update fetch positions for assigned partitions that do not have a position. This will: + *
    + *
  • check if all assigned partitions already have fetch positions and return right away if that's the case
  • + *
  • trigger an async request to validate positions (detect log truncation)
  • + *
  • fetch committed offsets if enabled, and use the response to update the positions
  • + *
  • fetch partition offsets for partitions that may still require a position, and use the response to + * update the positions
  • + *
+ * + * @param deadlineMs Time in milliseconds when the triggering application event expires. Any error received after + * this will be saved, and used to complete the result exceptionally on the next call to this + * function. + * @return Future that will complete with a boolean indicating if all assigned partitions have positions (based + * on {@link SubscriptionState#hasAllFetchPositions()}). It will complete immediately, with true, if all positions + * are already available. If some positions are missing, the future will complete once the offsets are retrieved and positions are updated. + */ + public CompletableFuture updateFetchPositions(long deadlineMs) { + CompletableFuture result = new CompletableFuture<>(); + + try { + if (maybeCompleteWithPreviousException(result)) { + return result; + } + + validatePositionsIfNeeded(); + + if (subscriptionState.hasAllFetchPositions()) { + // All positions are already available + result.complete(true); + return result; + } + + // Some positions are missing, so trigger requests to fetch offsets and update them. + updatePositionsWithOffsets(deadlineMs).whenComplete((__, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(subscriptionState.hasAllFetchPositions()); + } + }); + + } catch (Exception e) { + result.completeExceptionally(maybeWrapAsKafkaException(e)); + } + return result; + } + + private boolean maybeCompleteWithPreviousException(CompletableFuture result) { + Throwable cachedException = cachedUpdatePositionsException.getAndSet(null); + if (cachedException != null) { + result.completeExceptionally(cachedException); + return true; + } + return false; + } + + /** + * Generate requests to fetch offsets and update positions once a response is received. This will first attempt + * to use the committed offsets if available. If no committed offsets available, it will use the partition + * offsets retrieved from the leader. + */ + private CompletableFuture updatePositionsWithOffsets(long deadlineMs) { + CompletableFuture result = new CompletableFuture<>(); + + cacheExceptionIfEventExpired(result, deadlineMs); + + CompletableFuture updatePositions; + if (commitRequestManager != null) { + CompletableFuture refreshWithCommittedOffsets = initWithCommittedOffsetsIfNeeded(deadlineMs); + + // Reset positions for all partitions that may still require it (or that are awaiting reset) + updatePositions = refreshWithCommittedOffsets.thenCompose(__ -> initWithPartitionOffsetsIfNeeded()); + + } else { + updatePositions = initWithPartitionOffsetsIfNeeded(); + } + + updatePositions.whenComplete((__, resetError) -> { + if (resetError == null) { + result.complete(null); + } else { + result.completeExceptionally(resetError); + } + }); + + return result; + } + + /** + * Save exception that may occur while updating fetch positions. Note that since the update fetch positions + * is triggered asynchronously, errors may be found when the triggering UpdateFetchPositionsEvent has already + * expired. In that case, the exception is saved in memory, to be thrown when processing the following + * UpdateFetchPositionsEvent. + * + * @param result Update fetch positions future to get the exception from (if any) + * @param deadlineMs Deadline of the triggering application event, used to identify if the event has already + * expired when the error in the result future occurs. + */ + private void cacheExceptionIfEventExpired(CompletableFuture result, long deadlineMs) { + result.whenComplete((__, error) -> { + boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; + if (error != null && updatePositionsExpired) { + cachedUpdatePositionsException.set(error); + } + }); + } + + /** + * If there are partitions still needing a position and a reset policy is defined, request reset using the + * default policy. + * + * @return Future that will complete when the reset operation completes retrieving the offsets and setting + * positions in the subscription state using them. + * @throws NoOffsetForPartitionException If no reset strategy is configured. + */ + private CompletableFuture initWithPartitionOffsetsIfNeeded() { + CompletableFuture result = new CompletableFuture<>(); + try { + // Mark partitions that need reset, using the configured reset strategy. If no + // strategy is defined, this will raise a NoOffsetForPartitionException exception. + subscriptionState.resetInitializingPositions(); + } catch (Exception e) { + result.completeExceptionally(e); + return result; + } + + // For partitions awaiting reset, generate a ListOffset request to retrieve the partition + // offsets according to the strategy (ex. earliest, latest), and update the positions. + return resetPositionsIfNeeded(); + } + + /** + * Fetch the committed offsets for partitions that require initialization. This will trigger an OffsetFetch + * request and update positions in the subscription state once a response is received. + * + * @throws TimeoutException If offsets could not be retrieved within the timeout + */ + private CompletableFuture initWithCommittedOffsetsIfNeeded(long deadlineMs) { + final Set initializingPartitions = subscriptionState.initializingPartitions(); + + if (initializingPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); + CompletableFuture result = new CompletableFuture<>(); + + // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle + // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created + // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the + // case it times out, subsequent attempts will also use the event in order to wait for the results. + if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { + // Generate a new OffsetFetch request and update positions when a response is received + final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs); + CompletableFuture> fetchOffsets = + commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs); + CompletableFuture> fetchOffsetsAndRefresh = + fetchOffsets.whenComplete((offsets, error) -> { + pendingOffsetFetchEvent = null; + // Update positions with the retrieved offsets + refreshOffsets(offsets, error, result); + }); + pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions, fetchOffsetsAndRefresh); + } else { + // Reuse pending OffsetFetch request that will complete when positions are refreshed with the committed offsets retrieved + pendingOffsetFetchEvent.result.whenComplete((__, error) -> { + if (error == null) { + result.complete(null); + } else { + result.completeExceptionally(error); + } + }); + } + + return result; + } + + /** + * Use the given committed offsets to update positions for partitions that still require it. + * + * @param offsets Committed offsets to use to update positions for initializing partitions. + * @param error Error received in response to the OffsetFetch request. Will be null if the request was successful. + * @param result Future to complete once all positions have been updated with the given committed offsets + */ + private void refreshOffsets(final Map offsets, + final Throwable error, + final CompletableFuture result) { + if (error == null) { + + // Ensure we only set positions for the partitions that still require one (ex. some partitions may have + // been assigned a position manually) + Map offsetsToApply = offsetsForInitializingPartitions(offsets); + + refreshCommittedOffsets(offsetsToApply, metadata, subscriptionState); + + result.complete(null); + + } else { + log.error("Error fetching committed offsets to update positions", error); + result.completeExceptionally(error); + } + } + + /** + * Get the offsets, from the given collection, that belong to partitions that still require a position (partitions + * that are initializing). This is expected to be used to filter out offsets that were retrieved for partitions + * that do not need a position anymore. + * + * @param offsets Offsets per partition + * @return Subset of the offsets associated to partitions that are still initializing + */ + private Map offsetsForInitializingPartitions(Map offsets) { + Set currentlyInitializingPartitions = subscriptionState.initializingPartitions(); + Map result = new HashMap<>(); + offsets.forEach((key, value) -> { + if (currentlyInitializingPartitions.contains(key)) { + result.put(key, value); + } + }); + return result; + } + + /** + * This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse + * is only possible if all the following conditions are true: + * + *
    + *
  • A pending offset fetch event exists
  • + *
  • The partition set of the pending offset fetch event is the same as the given partitions
  • + *
+ */ + private boolean canReusePendingOffsetFetchEvent(Set partitions) { + if (pendingOffsetFetchEvent == null) { + return false; + } + + return pendingOffsetFetchEvent.requestedPartitions.equals(partitions); + } + /** * Reset offsets for all assigned partitions that require it. Offsets will be reset * with timestamps according to the reset strategy defined for each partition. This will * generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent * on the next call to {@link #poll(long)}. - * *

- * * When a response is received, positions are updated in-memory, on the subscription state. If * an error is received in the response, it will be saved to be thrown on the next call to * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) */ - public CompletableFuture resetPositionsIfNeeded() { + CompletableFuture resetPositionsIfNeeded() { Map offsetResetTimestamps; try { offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); } catch (Exception e) { - backgroundEventHandler.add(new ErrorEvent(e)); - return CompletableFuture.completedFuture(null); + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(e); + return result; } if (offsetResetTimestamps.isEmpty()) @@ -218,18 +490,17 @@ public CompletableFuture resetPositionsIfNeeded() { * *

* - * When a response is received, positions are validated and, if a log truncation is - * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the + * When a response is received, positions are validated and, if a log truncation is detected, a + * {@link LogTruncationException} will be saved in memory in cachedUpdatePositionsException, to be thrown on the * next call to this function. */ - public CompletableFuture validatePositionsIfNeeded() { - Map partitionsToValidate = - offsetFetcherUtils.getPartitionsToValidate(); + void validatePositionsIfNeeded() { + Map partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); if (partitionsToValidate.isEmpty()) { - return CompletableFuture.completedFuture(null); + return; } - return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); + sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); } /** @@ -435,19 +706,15 @@ private CompletableFuture sendListOffsetsRequestsAndResetPositions( * This also adds the request to the list of unsentRequests. * * @param partitionsToValidate a map of topic-partition positions to validate - * @return A {@link CompletableFuture} which completes when the requests are - * complete. */ - private CompletableFuture sendOffsetsForLeaderEpochRequestsAndValidatePositions( + private void sendOffsetsForLeaderEpochRequestsAndValidatePositions( Map partitionsToValidate) { final Map> regrouped = regroupFetchPositionsByLeader(partitionsToValidate); long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; - final AtomicInteger expectedResponses = new AtomicInteger(0); - final CompletableFuture globalResult = new CompletableFuture<>(); final List unsentRequests = new ArrayList<>(); regrouped.forEach((node, fetchPositions) -> { @@ -491,20 +758,10 @@ private CompletableFuture sendOffsetsForLeaderEpochRequestsAndValidatePosi } offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e); } - if (expectedResponses.decrementAndGet() == 0) { - globalResult.complete(null); - } }); }); - if (unsentRequests.isEmpty()) { - globalResult.complete(null); - } else { - expectedResponses.set(unsentRequests.size()); - requestsToSend.addAll(unsentRequests); - } - - return globalResult; + requestsToSend.addAll(unsentRequests); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 9925e21571ada..c4aecf75c1063 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -166,16 +166,8 @@ protected RequestManagers create() { long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); final int requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); - final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions, - metadata, - fetchConfig.isolationLevel, - time, - retryBackoffMs, - requestTimeoutMs, - apiVersions, - networkClientDelegate, - backgroundEventHandler, - logContext); + final int defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + final FetchRequestManager fetch = new FetchRequestManager(logContext, time, metadata, @@ -192,7 +184,7 @@ protected RequestManagers create() { ConsumerHeartbeatRequestManager heartbeatRequestManager = null; ConsumerMembershipManager membershipManager = null; CoordinatorRequestManager coordinator = null; - CommitRequestManager commit = null; + CommitRequestManager commitRequestManager = null; if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { Optional serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); @@ -202,7 +194,7 @@ protected RequestManagers create() { retryBackoffMaxMs, backgroundEventHandler, groupRebalanceConfig.groupId); - commit = new CommitRequestManager( + commitRequestManager = new CommitRequestManager( time, logContext, subscriptions, @@ -218,14 +210,14 @@ protected RequestManagers create() { groupRebalanceConfig.rebalanceTimeoutMs, serverAssignor, subscriptions, - commit, + commitRequestManager, metadata, logContext, clientTelemetryReporter, backgroundEventHandler, time, metrics); - membershipManager.registerStateListener(commit); + membershipManager.registerStateListener(commitRequestManager); membershipManager.registerStateListener(applicationThreadMemberStateListener); heartbeatRequestManager = new ConsumerHeartbeatRequestManager( logContext, @@ -238,13 +230,25 @@ protected RequestManagers create() { metrics); } + final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions, + metadata, + fetchConfig.isolationLevel, + time, + retryBackoffMs, + requestTimeoutMs, + defaultApiTimeoutMs, + apiVersions, + networkClientDelegate, + commitRequestManager, + logContext); + return new RequestManagers( logContext, listOffsets, topic, fetch, Optional.ofNullable(coordinator), - Optional.ofNullable(commit), + Optional.ofNullable(commitRequestManager), Optional.ofNullable(heartbeatRequestManager), Optional.ofNullable(membershipManager) ); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 4b0584b5bb8c1..15525957fc136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -30,7 +30,7 @@ public abstract class ApplicationEvent { public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, - LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, + LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index d8fc13830f716..0baafcd3038d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -110,6 +111,12 @@ public long maximumTimeToWait() { public T addAndGet(final CompletableApplicationEvent event) { Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null"); add(event); + // Check if the thread was interrupted before we start waiting, to ensure that we + // propagate the exception even if we end up not having to wait (the event could complete + // between the time it's added and the time we attempt to getResult) + if (Thread.interrupted()) { + throw new InterruptException("Interrupted waiting for results for application event " + event); + } return ConsumerUtils.getResult(event.future()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 6ce8737c78a5e..6ae6c23f6ee1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -99,12 +99,8 @@ public void process(ApplicationEvent event) { process((ListOffsetsEvent) event); return; - case RESET_POSITIONS: - process((ResetPositionsEvent) event); - return; - - case VALIDATE_POSITIONS: - process((ValidatePositionsEvent) event); + case CHECK_AND_UPDATE_POSITIONS: + process((CheckAndUpdatePositionsEvent) event); return; case SUBSCRIPTION_CHANGE: @@ -259,13 +255,12 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); - } - - private void process(final ValidatePositionsEvent event) { - CompletableFuture future = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + /** + * Check if all assigned partitions have fetch positions. If there are missing positions, fetch offsets and use + * them to update positions in the subscription state. + */ + private void process(final CheckAndUpdatePositionsEvent event) { + CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); future.whenComplete(complete(event.future())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java similarity index 52% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java index 86dbb80c0f0ac..2c7fdd7464283 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java @@ -17,14 +17,19 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; + /** - * Event for resetting offsets for all assigned partitions that require it. This is an - * asynchronous event that generates ListOffsets requests, and completes by updating in-memory - * positions when responses are received. + * Event to check if all assigned partitions have fetch positions. If there are positions missing, it will fetch + * offsets and update positions when it gets them. This will first attempt to use the committed offsets if available. If + * no committed offsets available, it will use the partition offsets retrieved from the leader. + *

+ * The event completes with a boolean indicating if all assigned partitions have valid fetch positions + * (based on {@link SubscriptionState#hasAllFetchPositions()}). */ -public class ResetPositionsEvent extends CompletableApplicationEvent { +public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent { - public ResetPositionsEvent(final long deadlineMs) { - super(Type.RESET_POSITIONS, deadlineMs); + public CheckAndUpdatePositionsEvent(long deadlineMs) { + super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java deleted file mode 100644 index a93ff9859a58e..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals.events; - -/** - * Event for validating offsets for all assigned partitions for which a leader change has been - * detected. This is an asynchronous event that generates OffsetForLeaderEpoch requests, and - * completes by validating in-memory positions against the offsets received in the responses. - */ -public class ValidatePositionsEvent extends CompletableApplicationEvent { - - public ValidatePositionsEvent(final long deadlineMs) { - super(Type.VALIDATE_POSITIONS, deadlineMs); - } -} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 2526b6447a4c6..ce7cd17c9965a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; @@ -44,12 +45,10 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; -import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; -import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -386,8 +385,7 @@ public void testWakeupBeforeCallingPoll() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -409,8 +407,7 @@ public void testWakeupAfterEmptyFetch() { consumer.wakeup(); return Fetch.empty(); }).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -434,8 +431,7 @@ public void testWakeupAfterNonEmptyFetch() { consumer.wakeup(); return Fetch.forPartition(tp, records, true); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -454,6 +450,7 @@ public void testCommitInRebalanceCallback() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.add(tp); CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, sortedPartitions); @@ -496,8 +493,7 @@ public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = Collections.singletonMap(tp, new OffsetAndMetadata(1)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); doReturn(cluster).when(metadata).fetch(); doReturn(Collections.singleton(topicName)).when(cluster).topics(); @@ -529,8 +525,7 @@ public void testClearWakeupTriggerAfterPoll() { ); doReturn(Fetch.forPartition(tp, records, true)) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -575,157 +570,6 @@ public void testPollLongThrowsException() { "This method is deprecated and will be removed in the next major release.", e.getMessage()); } - @Test - public void testOffsetFetchStoresPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - // The first attempt at poll() creates an event, enqueues it, but its Future does not complete within the - // timeout, leaving a pending fetch. - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // For the second attempt, the event is reused, so first verify that another FetchCommittedOffsetsEvent - // was not enqueued. On this attempt the Future returns successfully, clearing the pending fetch. - event.future().complete(Collections.emptyMap()); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler, never()).add(any(FetchCommittedOffsetsEvent.class)); - assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), time.timer(timeoutMs))); - assertFalse(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - - // The first attempt at poll() retrieves data for partition 0 of the topic. poll() creates an event, - // enqueues it, but its Future does not complete within the timeout, leaving a pending fetch. - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event1 = getLastEnqueuedEvent(); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // For the second attempt, the set of partitions is reassigned, causing the pending offset to be replaced. - // Verify that another FetchCommittedOffsetsEvent is enqueued. - consumer.assign(Collections.singleton(new TopicPartition("topic1", 1))); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event2 = getLastEnqueuedEvent(); - assertNotEquals(event1, event2); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // For the third attempt, the event from attempt 2 is reused, so there should not have been another - // FetchCommittedOffsetsEvent enqueued. The Future is completed to make it return successfully in poll(). - // This will finally clear out the pending fetch. - event2.future().complete(Collections.emptyMap()); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler, never()).add(any(FetchCommittedOffsetsEvent.class)); - assertDoesNotThrow(() -> ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs))); - assertFalse(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchDoesNotReuseExpiredPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - // The first attempt at poll() creates an event, enqueues it, but its Future does not complete within - // the timeout, leaving a pending fetch. - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event1 = getLastEnqueuedEvent(); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // Sleep past the event's expiration, causing the poll() to *not* reuse the pending fetch. A new event - // is created and added to the application event queue. - time.sleep(event1.deadlineMs() - time.milliseconds()); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event2 = getLastEnqueuedEvent(); - assertNotEquals(event1, event2); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchTimeoutExceptionKeepsPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - event.future().completeExceptionally(new TimeoutException("Test error")); - assertDoesNotThrow(() -> consumer.poll(Duration.ofMillis(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchInterruptExceptionKeepsPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - event.future().completeExceptionally(new InterruptException("Test error")); - assertThrows(InterruptException.class, () -> consumer.poll(Duration.ofMillis(timeoutMs))); - assertTrue(Thread.interrupted()); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchUnexpectedExceptionClearsPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - event.future().completeExceptionally(new NullPointerException("Test error")); - assertThrows(KafkaException.class, () -> consumer.poll(Duration.ofMillis(timeoutMs))); - assertFalse(consumer.hasPendingOffsetFetchEvent()); - } - @Test public void testCommitSyncLeaderEpochUpdate() { consumer = newConsumer(); @@ -801,7 +645,6 @@ public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() { consumer = newConsumer(config); completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); @@ -825,7 +668,6 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { consumer = newConsumer(config); completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); @@ -925,7 +767,6 @@ public void testPollTriggersFencedExceptionFromCommitAsync() { consumer = newConsumer(config); completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); @@ -945,7 +786,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); @@ -1443,25 +1284,6 @@ public void testNoInterceptorCommitAsyncFailed() { assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); } - @Test - public void testRefreshCommittedOffsetsSuccess() { - consumer = newConsumer(); - completeCommitSyncApplicationEventSuccessfully(); - TopicPartition partition = new TopicPartition("t1", 1); - Set partitions = Collections.singleton(partition); - Map committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L)); - testRefreshCommittedOffsetsSuccess(partitions, committedOffsets); - } - - @Test - public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() { - consumer = newConsumer(); - TopicPartition partition = new TopicPartition("t1", 1); - Set partitions = Collections.singleton(partition); - Map committedOffsets = Collections.emptyMap(); - testRefreshCommittedOffsetsSuccess(partitions, committedOffsets); - } - @Test public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { consumer = newConsumer(); @@ -1694,6 +1516,7 @@ public void testListenerCallbacksInvoke(List consumer.poll(Duration.ZERO)); } @@ -1865,6 +1688,7 @@ public void testEnsurePollEventSentOnConsumerPoll() { doAnswer(invocation -> Fetch.forPartition(tp, records, true)) .when(fetchCollector) .collectFetch(Mockito.any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); consumer.subscribe(singletonList("topic1")); consumer.poll(Duration.ofMillis(100)); @@ -1880,6 +1704,7 @@ private Properties requiredConsumerConfigAndGroupId(final String groupId) { private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) { completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(new TopicPartition("t1", 1))); @@ -1887,40 +1712,7 @@ private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean co consumer.poll(Duration.ZERO); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class)); - - if (committedOffsetsEnabled) { - // Verify there was an FetchCommittedOffsets event and no ResetPositions event - verify(applicationEventHandler, atLeast(1)) - .add(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); - verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); - } else { - // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions - verify(applicationEventHandler, never()) - .add(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); - } - } - - private void testRefreshCommittedOffsetsSuccess(Set partitions, - Map committedOffsets) { - completeFetchedCommittedOffsetApplicationEventSuccessfully(committedOffsets); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); - - completeAssignmentChangeEventSuccessfully(); - consumer.assign(partitions); - - consumer.poll(Duration.ZERO); - - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class)); - verify(applicationEventHandler, atLeast(1)) - .add(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); + .addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class)); } @Test @@ -1947,6 +1739,7 @@ public void testLongPollWaitIsLimited() { }).doAnswer(invocation -> Fetch.forPartition(tp, records, true) ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); @@ -2041,8 +1834,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -2079,6 +1871,7 @@ void testReaperInvokedInPoll() { consumer = newConsumer(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); consumer.subscribe(Collections.singletonList("topic")); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); consumer.poll(Duration.ZERO); verify(backgroundEventReaper).reap(time.milliseconds()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index c28202efa01cc..e270c039eeb4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -20,10 +20,8 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.IsolationLevel; @@ -43,7 +41,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -60,25 +58,25 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.BlockingQueue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -90,9 +88,9 @@ public class OffsetsRequestManagerTest { private OffsetsRequestManager requestManager; private ConsumerMetadata metadata; private SubscriptionState subscriptionState; - private MockTime time; + private final Time time = mock(Time.class); private ApiVersions apiVersions; - private BlockingQueue backgroundEventQueue; + private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); private static final String TEST_TOPIC = "t1"; private static final TopicPartition TEST_PARTITION_1 = new TopicPartition(TEST_TOPIC, 1); private static final TopicPartition TEST_PARTITION_2 = new TopicPartition(TEST_TOPIC, 2); @@ -101,15 +99,13 @@ public class OffsetsRequestManagerTest { private static final IsolationLevel DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_COMMITTED; private static final int RETRY_BACKOFF_MS = 500; private static final int REQUEST_TIMEOUT_MS = 500; + private static final int DEFAULT_API_TIMEOUT_MS = 500; @BeforeEach public void setup() { LogContext logContext = new LogContext(); - backgroundEventQueue = new LinkedBlockingQueue<>(); - BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); metadata = mock(ConsumerMetadata.class); subscriptionState = mock(SubscriptionState.class); - time = new MockTime(0); apiVersions = mock(ApiVersions.class); requestManager = new OffsetsRequestManager( subscriptionState, @@ -118,9 +114,10 @@ public void setup() { time, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS, + DEFAULT_API_TIMEOUT_MS, apiVersions, mock(NetworkClientDelegate.class), - backgroundEventHandler, + commitRequestManager, logContext ); } @@ -541,39 +538,29 @@ public void testResetOffsetsAuthorizationFailure() { when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - requestManager.resetPositionsIfNeeded(); + CompletableFuture resetResult = requestManager.resetPositionsIfNeeded(); // Reset positions response with TopicAuthorizationException NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); + assertFalse(resetResult.isDone()); Errors topicAuthorizationFailedError = Errors.TOPIC_AUTHORIZATION_FAILED; ClientResponse clientResponse = buildClientResponseWithErrors( unsentRequest, Collections.singletonMap(TEST_PARTITION_1, topicAuthorizationFailedError)); clientResponse.onComplete(); assertTrue(unsentRequest.future().isDone()); + assertTrue(resetResult.isDone()); assertFalse(unsentRequest.future().isCompletedExceptionally()); verify(subscriptionState).requestFailed(any(), anyLong()); verify(metadata).requestUpdate(false); - // Following resetPositions should enqueue the previous exception in the background event queue - // without performing any request - assertDoesNotThrow(() -> requestManager.resetPositionsIfNeeded()); + // Following resetPositions should throw the exception + CompletableFuture nextReset = assertDoesNotThrow(() -> requestManager.resetPositionsIfNeeded()); assertEquals(0, requestManager.requestsToSend()); - - // Check that the event was enqueued during resetPositionsIfNeeded - assertEquals(1, backgroundEventQueue.size()); - BackgroundEvent event = backgroundEventQueue.poll(); - assertNotNull(event); - - // Check that the event itself is of the expected type - assertInstanceOf(ErrorEvent.class, event); - ErrorEvent errorEvent = (ErrorEvent) event; - assertNotNull(errorEvent.error()); - - // Check that the error held in the event is of the expected type - assertInstanceOf(topicAuthorizationFailedError.exception().getClass(), errorEvent.error()); + assertTrue(nextReset.isCompletedExceptionally()); + assertFutureThrows(nextReset, TopicAuthorizationException.class); } @Test @@ -672,6 +659,107 @@ public void testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers( assertEquals(1, requestManager.requestsToSend(), "Invalid request count"); } + @Test + public void testUpdatePositionsWithCommittedOffsets() { + long internalFetchCommittedTimeout = time.milliseconds() + DEFAULT_API_TIMEOUT_MS; + TopicPartition tp1 = new TopicPartition("topic1", 1); + Set initPartitions1 = Collections.singleton(tp1); + Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, Optional.of(1)); + + // tp1 assigned and requires a position + mockAssignedPartitionsMissingPositions(initPartitions1, initPartitions1, leaderAndEpoch); + + // Call to updateFetchPositions. Should send an OffsetFetch request and use the response to set positions + CompletableFuture> fetchResult = new CompletableFuture<>(); + when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + assertFalse(updatePositions1.isDone(), "Update positions should wait for the OffsetFetch request"); + verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + + // Receive response with committed offsets. Should complete the updatePositions operation (the set + // of initializing partitions hasn't changed) + when(subscriptionState.initializingPartitions()).thenReturn(initPartitions1); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, Optional.of(1), ""); + fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata)); + + assertTrue(updatePositions1.isDone(), "Update positions should complete after the OffsetFetch response"); + SubscriptionState.FetchPosition expectedPosition = new SubscriptionState.FetchPosition( + offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), leaderAndEpoch); + verify(subscriptionState).seekUnvalidated(tp1, expectedPosition); + } + + @Test + public void testUpdatePositionsWithCommittedOffsetsReusesRequest() { + long internalFetchCommittedTimeout = time.milliseconds() + DEFAULT_API_TIMEOUT_MS; + TopicPartition tp1 = new TopicPartition("topic1", 1); + Set initPartitions1 = Collections.singleton(tp1); + Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, Optional.of(1)); + + // tp1 assigned and requires a position + mockAssignedPartitionsMissingPositions(initPartitions1, initPartitions1, leaderAndEpoch); + + // call to updateFetchPositions. Should send an OffsetFetch request + CompletableFuture> fetchResult = new CompletableFuture<>(); + when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + assertFalse(updatePositions1.isDone(), "Update positions should wait for the OffsetFetch request"); + verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + clearInvocations(commitRequestManager); + + // Call to updateFetchPositions again with the same set of initializing partitions should reuse request + CompletableFuture updatePositions2 = requestManager.updateFetchPositions(time.milliseconds()); + verify(commitRequestManager, never()).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + + // Receive response with committed offsets, should complete both calls + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, Optional.of(1), ""); + fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata)); + + assertTrue(updatePositions1.isDone()); + assertTrue(updatePositions2.isDone()); + SubscriptionState.FetchPosition expectedPosition = new SubscriptionState.FetchPosition( + offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), leaderAndEpoch); + verify(subscriptionState).seekUnvalidated(tp1, expectedPosition); + } + + @Test + public void testUpdatePositionsDoesNotApplyOffsetsIfPartitionNotInitializingAnymore() { + long internalFetchCommittedTimeout = time.milliseconds() + DEFAULT_API_TIMEOUT_MS; + TopicPartition tp1 = new TopicPartition("topic1", 1); + Set initPartitions1 = Collections.singleton(tp1); + Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, Optional.of(1)); + + // tp1 assigned and requires a position + mockAssignedPartitionsMissingPositions(initPartitions1, initPartitions1, leaderAndEpoch); + + // call to updateFetchPositions will trigger an OffsetFetch request for tp1 (won't complete just yet) + CompletableFuture> fetchResult = new CompletableFuture<>(); + when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + assertFalse(updatePositions1.isDone()); + verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + clearInvocations(commitRequestManager); + + // tp1 does not require a position anymore (ex. removed from the assignment, or got a position manually via + // seek). When the OffsetFetch response is received, it should not update the position for tp1 to the + // committed offset + when(subscriptionState.initializingPartitions()).thenReturn(Collections.emptySet()); + fetchResult.complete(Collections.singletonMap(tp1, new OffsetAndMetadata(5))); + verify(subscriptionState, never()).seekUnvalidated(any(), any()); + } + + private void mockAssignedPartitionsMissingPositions(Set assignedPartitions, + Set initializingPartitions, + Metadata.LeaderAndEpoch leaderAndEpoch) { + when(subscriptionState.partitionsNeedingValidation(anyLong())).thenReturn(Collections.emptySet()); + assignedPartitions.forEach(tp -> { + when(subscriptionState.isAssigned(tp)).thenReturn(true); + when(metadata.currentLeader(tp)).thenReturn(leaderAndEpoch); + }); + + when(subscriptionState.hasAllFetchPositions()).thenReturn(false); + when(subscriptionState.initializingPartitions()).thenReturn(initializingPartitions); + } + private void mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition position, Node leader) { when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); when(subscriptionState.position(any())).thenReturn(position, position); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 84a7ac84d1c7f..80315099fb892 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -68,6 +68,7 @@ public class ApplicationEventProcessorTest { private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); private final ConsumerHeartbeatRequestManager heartbeatRequestManager = mock(ConsumerHeartbeatRequestManager.class); private final ConsumerMembershipManager membershipManager = mock(ConsumerMembershipManager.class); + private final OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); private final SubscriptionState subscriptionState = mock(SubscriptionState.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; @@ -75,7 +76,7 @@ public class ApplicationEventProcessorTest { private void setupProcessor(boolean withGroupId) { RequestManagers requestManagers = new RequestManagers( new LogContext(), - mock(OffsetsRequestManager.class), + offsetsRequestManager, mock(TopicMetadataRequestManager.class), mock(FetchRequestManager.class), withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), @@ -128,8 +129,7 @@ private static Stream applicationEvents() { Arguments.of(new PollEvent(100)), Arguments.of(new AsyncCommitEvent(new HashMap<>())), Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)), - Arguments.of(new ResetPositionsEvent(500)), - Arguments.of(new ValidatePositionsEvent(500)), + Arguments.of(new CheckAndUpdatePositionsEvent(500)), Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), Arguments.of(new AssignmentChangeEvent(12345, 12345, Collections.emptyList()))); } @@ -144,14 +144,6 @@ public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { verify(applicationEventProcessor).process(any(ListOffsetsEvent.class)); } - @Test - public void testResetPositionsProcess() { - ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); - ResetPositionsEvent event = new ResetPositionsEvent(calculateDeadlineMs(time, 100)); - applicationEventProcessor.process(event); - verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testAssignmentChangeEvent(boolean withGroupId) {