diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 06ee3bc3616d2..b88a25f907b3b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -42,6 +42,7 @@ import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CommitEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; @@ -54,13 +55,11 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; -import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; -import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent; import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; import org.apache.kafka.common.Cluster; @@ -130,7 +129,6 @@ import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; @@ -254,8 +252,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); private final AtomicInteger refCount = new AtomicInteger(0); - private FetchCommittedOffsetsEvent pendingOffsetFetchEvent; - AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { @@ -1576,48 +1572,28 @@ private Fetch collectFetch() { return fetch; } + /** * Set the fetch position to the committed position (if there is one) * or reset it using the offset reset policy the user has configured. * - * @throws AuthenticationException If authentication fails. See the exception for more details - * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is - * defined * @return true iff the operation completed without timing out + * @throws AuthenticationException If authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined */ private boolean updateFetchPositions(final Timer timer) { + cachedSubscriptionHasAllFetchPositions = false; try { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer))); - - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; - - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) - return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer))); - return true; + CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer)); + wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future()); + cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); } catch (TimeoutException e) { return false; + } finally { + wakeupTrigger.clearTask(); } + return true; } /** @@ -1629,79 +1605,6 @@ private boolean isCommittedOffsetsManagementEnabled() { return groupMetadata.get().isPresent(); } - /** - * Refresh the committed offsets for partitions that require initialization. - * - * @param timer Timer bounding how long this method can block - * @return true iff the operation completed within the timeout - */ - private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { - final Set initializingPartitions = subscriptions.initializingPartitions(); - - if (initializingPartitions.isEmpty()) - return true; - - log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); - - // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle - // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created - // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the - // case it times out, subsequent attempts will also use the event in order to wait for the results. - if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { - // Give the event a reasonable amount of time to complete. - final long timeoutMs = Math.max(defaultApiTimeoutMs, timer.remainingMs()); - final long deadlineMs = calculateDeadlineMs(time, timeoutMs); - pendingOffsetFetchEvent = new FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs); - applicationEventHandler.add(pendingOffsetFetchEvent); - } - - final CompletableFuture> future = pendingOffsetFetchEvent.future(); - - try { - wakeupTrigger.setActiveTask(future); - final Map offsets = ConsumerUtils.getResult(future, timer); - - // Clear the pending event once its result is successfully retrieved. - pendingOffsetFetchEvent = null; - - refreshCommittedOffsets(offsets, metadata, subscriptions); - return true; - } catch (TimeoutException e) { - log.debug( - "The committed offsets for the following partition(s) could not be refreshed within the timeout: {} ", - initializingPartitions - ); - return false; - } catch (InterruptException e) { - throw e; - } catch (Throwable t) { - pendingOffsetFetchEvent = null; - throw ConsumerUtils.maybeWrapAsKafkaException(t); - } finally { - wakeupTrigger.clearTask(); - } - } - - /** - * This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse - * is only possible if all the following conditions are true: - * - *
    - *
  • A pending offset fetch event exists
  • - *
  • The partition set of the pending offset fetch event is the same as the given partition set
  • - *
  • The pending offset fetch event has not expired
  • - *
- */ - private boolean canReusePendingOffsetFetchEvent(Set partitions) { - if (pendingOffsetFetchEvent == null) - return false; - - if (!pendingOffsetFetchEvent.partitions().equals(partitions)) - return false; - - return pendingOffsetFetchEvent.deadlineMs() > time.milliseconds(); - } - private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) { if (offsetAndMetadata != null) offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); @@ -2002,10 +1905,6 @@ SubscriptionState subscriptions() { return subscriptions; } - boolean hasPendingOffsetFetchEvent() { - return pendingOffsetFetchEvent != null; - } - private void maybeUpdateSubscriptionMetadata() { if (this.metadataVersionSnapshot < metadata.updateVersion()) { this.metadataVersionSnapshot = metadata.updateVersion(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java index 89f3fefae2bb6..113b8e0b9d6df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java @@ -215,9 +215,9 @@ public static void refreshCommittedOffsets(final Map T getResult(Future future, Timer timer) { + public static T getResult(Future future, long timeoutMs) { try { - return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); + return future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { if (e.getCause() instanceof IllegalStateException) throw (IllegalStateException) e.getCause(); @@ -229,6 +229,10 @@ public static T getResult(Future future, Timer timer) { } } + public static T getResult(Future future, Timer timer) { + return getResult(future, timer.remainingMs()); + } + public static T getResult(Future future) { try { return future.get(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index c219f1d98e77b..7d8eea269a4b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -21,16 +21,17 @@ import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.StaleMetadataException; import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -48,14 +49,18 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets; import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion; import static org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupFetchPositionsByLeader; @@ -86,7 +91,23 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis private final Time time; private final ApiVersions apiVersions; private final NetworkClientDelegate networkClientDelegate; - private final BackgroundEventHandler backgroundEventHandler; + private final CommitRequestManager commitRequestManager; + private final long defaultApiTimeoutMs; + + /** + * Exception that occurred while updating positions after the triggering event had already + * expired. It will be propagated and cleared on the next call to update fetch positions. + */ + private final AtomicReference cachedUpdatePositionsException = new AtomicReference<>(); + + /** + * This holds the last OffsetFetch request triggered to retrieve committed offsets to update + * fetch positions that hasn't completed yet. When a response is received, it's used to + * update the fetch positions and the pendingOffsetFetchEvent is cleared. If the update fetch + * positions attempt runs out of time before this OffsetFetch gets a response, it will be + * kept to be used on the next attempt to update fetch positions (if partitions remain the same) + */ + private PendingFetchCommittedRequest pendingOffsetFetchEvent; @SuppressWarnings("this-escape") public OffsetsRequestManager(final SubscriptionState subscriptionState, @@ -95,9 +116,10 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, final Time time, final long retryBackoffMs, final long requestTimeoutMs, + final long defaultApiTimeoutMs, final ApiVersions apiVersions, final NetworkClientDelegate networkClientDelegate, - final BackgroundEventHandler backgroundEventHandler, + final CommitRequestManager commitRequestManager, final LogContext logContext) { requireNonNull(subscriptionState); requireNonNull(metadata); @@ -105,7 +127,6 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, requireNonNull(time); requireNonNull(apiVersions); requireNonNull(networkClientDelegate); - requireNonNull(backgroundEventHandler); requireNonNull(logContext); this.metadata = metadata; @@ -116,15 +137,27 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState, this.subscriptionState = subscriptionState; this.time = time; this.requestTimeoutMs = requestTimeoutMs; + this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.apiVersions = apiVersions; this.networkClientDelegate = networkClientDelegate; - this.backgroundEventHandler = backgroundEventHandler; this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState, time, retryBackoffMs, apiVersions); // Register the cluster metadata update callback. Note this only relies on the // requestsToRetry initialized above, and won't be invoked until all managers are // initialized and the network thread started. this.metadata.addClusterUpdateListener(this); + this.commitRequestManager = commitRequestManager; + } + + private static class PendingFetchCommittedRequest { + final Set requestedPartitions; + final CompletableFuture> result; + + private PendingFetchCommittedRequest(final Set requestedPartitions, + final CompletableFuture> result) { + this.requestedPartitions = Objects.requireNonNull(requestedPartitions); + this.result = Objects.requireNonNull(result); + } } /** @@ -182,26 +215,265 @@ public CompletableFuture> fetchO result.fetchedOffsets)); } + /** + * Update fetch positions for assigned partitions that do not have a position. This will: + *
    + *
  • check if all assigned partitions already have fetch positions and return right away if that's the case
  • + *
  • trigger an async request to validate positions (detect log truncation)
  • + *
  • fetch committed offsets if enabled, and use the response to update the positions
  • + *
  • fetch partition offsets for partitions that may still require a position, and use the response to + * update the positions
  • + *
+ * + * @param deadlineMs Time in milliseconds when the triggering application event expires. Any error received after + * this will be saved, and used to complete the result exceptionally on the next call to this + * function. + * @return Future that will complete with a boolean indicating if all assigned partitions have positions (based + * on {@link SubscriptionState#hasAllFetchPositions()}). It will complete immediately, with true, if all positions + * are already available. If some positions are missing, the future will complete once the offsets are retrieved and positions are updated. + */ + public CompletableFuture updateFetchPositions(long deadlineMs) { + CompletableFuture result = new CompletableFuture<>(); + + try { + if (maybeCompleteWithPreviousException(result)) { + return result; + } + + validatePositionsIfNeeded(); + + if (subscriptionState.hasAllFetchPositions()) { + // All positions are already available + result.complete(true); + return result; + } + + // Some positions are missing, so trigger requests to fetch offsets and update them. + updatePositionsWithOffsets(deadlineMs).whenComplete((__, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(subscriptionState.hasAllFetchPositions()); + } + }); + + } catch (Exception e) { + result.completeExceptionally(maybeWrapAsKafkaException(e)); + } + return result; + } + + private boolean maybeCompleteWithPreviousException(CompletableFuture result) { + Throwable cachedException = cachedUpdatePositionsException.getAndSet(null); + if (cachedException != null) { + result.completeExceptionally(cachedException); + return true; + } + return false; + } + + /** + * Generate requests to fetch offsets and update positions once a response is received. This will first attempt + * to use the committed offsets if available. If no committed offsets available, it will use the partition + * offsets retrieved from the leader. + */ + private CompletableFuture updatePositionsWithOffsets(long deadlineMs) { + CompletableFuture result = new CompletableFuture<>(); + + cacheExceptionIfEventExpired(result, deadlineMs); + + CompletableFuture updatePositions; + if (commitRequestManager != null) { + CompletableFuture refreshWithCommittedOffsets = initWithCommittedOffsetsIfNeeded(deadlineMs); + + // Reset positions for all partitions that may still require it (or that are awaiting reset) + updatePositions = refreshWithCommittedOffsets.thenCompose(__ -> initWithPartitionOffsetsIfNeeded()); + + } else { + updatePositions = initWithPartitionOffsetsIfNeeded(); + } + + updatePositions.whenComplete((__, resetError) -> { + if (resetError == null) { + result.complete(null); + } else { + result.completeExceptionally(resetError); + } + }); + + return result; + } + + /** + * Save exception that may occur while updating fetch positions. Note that since the update fetch positions + * is triggered asynchronously, errors may be found when the triggering UpdateFetchPositionsEvent has already + * expired. In that case, the exception is saved in memory, to be thrown when processing the following + * UpdateFetchPositionsEvent. + * + * @param result Update fetch positions future to get the exception from (if any) + * @param deadlineMs Deadline of the triggering application event, used to identify if the event has already + * expired when the error in the result future occurs. + */ + private void cacheExceptionIfEventExpired(CompletableFuture result, long deadlineMs) { + result.whenComplete((__, error) -> { + boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; + if (error != null && updatePositionsExpired) { + cachedUpdatePositionsException.set(error); + } + }); + } + + /** + * If there are partitions still needing a position and a reset policy is defined, request reset using the + * default policy. + * + * @return Future that will complete when the reset operation completes retrieving the offsets and setting + * positions in the subscription state using them. + * @throws NoOffsetForPartitionException If no reset strategy is configured. + */ + private CompletableFuture initWithPartitionOffsetsIfNeeded() { + CompletableFuture result = new CompletableFuture<>(); + try { + // Mark partitions that need reset, using the configured reset strategy. If no + // strategy is defined, this will raise a NoOffsetForPartitionException exception. + subscriptionState.resetInitializingPositions(); + } catch (Exception e) { + result.completeExceptionally(e); + return result; + } + + // For partitions awaiting reset, generate a ListOffset request to retrieve the partition + // offsets according to the strategy (ex. earliest, latest), and update the positions. + return resetPositionsIfNeeded(); + } + + /** + * Fetch the committed offsets for partitions that require initialization. This will trigger an OffsetFetch + * request and update positions in the subscription state once a response is received. + * + * @throws TimeoutException If offsets could not be retrieved within the timeout + */ + private CompletableFuture initWithCommittedOffsetsIfNeeded(long deadlineMs) { + final Set initializingPartitions = subscriptionState.initializingPartitions(); + + if (initializingPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); + CompletableFuture result = new CompletableFuture<>(); + + // The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle + // this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created + // (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the + // case it times out, subsequent attempts will also use the event in order to wait for the results. + if (!canReusePendingOffsetFetchEvent(initializingPartitions)) { + // Generate a new OffsetFetch request and update positions when a response is received + final long fetchCommittedDeadlineMs = Math.max(deadlineMs, time.milliseconds() + defaultApiTimeoutMs); + CompletableFuture> fetchOffsets = + commitRequestManager.fetchOffsets(initializingPartitions, fetchCommittedDeadlineMs); + CompletableFuture> fetchOffsetsAndRefresh = + fetchOffsets.whenComplete((offsets, error) -> { + pendingOffsetFetchEvent = null; + // Update positions with the retrieved offsets + refreshOffsets(offsets, error, result); + }); + pendingOffsetFetchEvent = new PendingFetchCommittedRequest(initializingPartitions, fetchOffsetsAndRefresh); + } else { + // Reuse pending OffsetFetch request that will complete when positions are refreshed with the committed offsets retrieved + pendingOffsetFetchEvent.result.whenComplete((__, error) -> { + if (error == null) { + result.complete(null); + } else { + result.completeExceptionally(error); + } + }); + } + + return result; + } + + /** + * Use the given committed offsets to update positions for partitions that still require it. + * + * @param offsets Committed offsets to use to update positions for initializing partitions. + * @param error Error received in response to the OffsetFetch request. Will be null if the request was successful. + * @param result Future to complete once all positions have been updated with the given committed offsets + */ + private void refreshOffsets(final Map offsets, + final Throwable error, + final CompletableFuture result) { + if (error == null) { + + // Ensure we only set positions for the partitions that still require one (ex. some partitions may have + // been assigned a position manually) + Map offsetsToApply = offsetsForInitializingPartitions(offsets); + + refreshCommittedOffsets(offsetsToApply, metadata, subscriptionState); + + result.complete(null); + + } else { + log.error("Error fetching committed offsets to update positions", error); + result.completeExceptionally(error); + } + } + + /** + * Get the offsets, from the given collection, that belong to partitions that still require a position (partitions + * that are initializing). This is expected to be used to filter out offsets that were retrieved for partitions + * that do not need a position anymore. + * + * @param offsets Offsets per partition + * @return Subset of the offsets associated to partitions that are still initializing + */ + private Map offsetsForInitializingPartitions(Map offsets) { + Set currentlyInitializingPartitions = subscriptionState.initializingPartitions(); + Map result = new HashMap<>(); + offsets.forEach((key, value) -> { + if (currentlyInitializingPartitions.contains(key)) { + result.put(key, value); + } + }); + return result; + } + + /** + * This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse + * is only possible if all the following conditions are true: + * + *
    + *
  • A pending offset fetch event exists
  • + *
  • The partition set of the pending offset fetch event is the same as the given partitions
  • + *
+ */ + private boolean canReusePendingOffsetFetchEvent(Set partitions) { + if (pendingOffsetFetchEvent == null) { + return false; + } + + return pendingOffsetFetchEvent.requestedPartitions.equals(partitions); + } + /** * Reset offsets for all assigned partitions that require it. Offsets will be reset * with timestamps according to the reset strategy defined for each partition. This will * generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent * on the next call to {@link #poll(long)}. - * *

- * * When a response is received, positions are updated in-memory, on the subscription state. If * an error is received in the response, it will be saved to be thrown on the next call to * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) */ - public CompletableFuture resetPositionsIfNeeded() { + CompletableFuture resetPositionsIfNeeded() { Map offsetResetTimestamps; try { offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); } catch (Exception e) { - backgroundEventHandler.add(new ErrorEvent(e)); - return CompletableFuture.completedFuture(null); + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(e); + return result; } if (offsetResetTimestamps.isEmpty()) @@ -218,18 +490,17 @@ public CompletableFuture resetPositionsIfNeeded() { * *

* - * When a response is received, positions are validated and, if a log truncation is - * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the + * When a response is received, positions are validated and, if a log truncation is detected, a + * {@link LogTruncationException} will be saved in memory in cachedUpdatePositionsException, to be thrown on the * next call to this function. */ - public CompletableFuture validatePositionsIfNeeded() { - Map partitionsToValidate = - offsetFetcherUtils.getPartitionsToValidate(); + void validatePositionsIfNeeded() { + Map partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); if (partitionsToValidate.isEmpty()) { - return CompletableFuture.completedFuture(null); + return; } - return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); + sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); } /** @@ -435,19 +706,15 @@ private CompletableFuture sendListOffsetsRequestsAndResetPositions( * This also adds the request to the list of unsentRequests. * * @param partitionsToValidate a map of topic-partition positions to validate - * @return A {@link CompletableFuture} which completes when the requests are - * complete. */ - private CompletableFuture sendOffsetsForLeaderEpochRequestsAndValidatePositions( + private void sendOffsetsForLeaderEpochRequestsAndValidatePositions( Map partitionsToValidate) { final Map> regrouped = regroupFetchPositionsByLeader(partitionsToValidate); long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; - final AtomicInteger expectedResponses = new AtomicInteger(0); - final CompletableFuture globalResult = new CompletableFuture<>(); final List unsentRequests = new ArrayList<>(); regrouped.forEach((node, fetchPositions) -> { @@ -491,20 +758,10 @@ private CompletableFuture sendOffsetsForLeaderEpochRequestsAndValidatePosi } offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e); } - if (expectedResponses.decrementAndGet() == 0) { - globalResult.complete(null); - } }); }); - if (unsentRequests.isEmpty()) { - globalResult.complete(null); - } else { - expectedResponses.set(unsentRequests.size()); - requestsToSend.addAll(unsentRequests); - } - - return globalResult; + requestsToSend.addAll(unsentRequests); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 9925e21571ada..c4aecf75c1063 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -166,16 +166,8 @@ protected RequestManagers create() { long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); final int requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); - final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions, - metadata, - fetchConfig.isolationLevel, - time, - retryBackoffMs, - requestTimeoutMs, - apiVersions, - networkClientDelegate, - backgroundEventHandler, - logContext); + final int defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); + final FetchRequestManager fetch = new FetchRequestManager(logContext, time, metadata, @@ -192,7 +184,7 @@ protected RequestManagers create() { ConsumerHeartbeatRequestManager heartbeatRequestManager = null; ConsumerMembershipManager membershipManager = null; CoordinatorRequestManager coordinator = null; - CommitRequestManager commit = null; + CommitRequestManager commitRequestManager = null; if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) { Optional serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); @@ -202,7 +194,7 @@ protected RequestManagers create() { retryBackoffMaxMs, backgroundEventHandler, groupRebalanceConfig.groupId); - commit = new CommitRequestManager( + commitRequestManager = new CommitRequestManager( time, logContext, subscriptions, @@ -218,14 +210,14 @@ protected RequestManagers create() { groupRebalanceConfig.rebalanceTimeoutMs, serverAssignor, subscriptions, - commit, + commitRequestManager, metadata, logContext, clientTelemetryReporter, backgroundEventHandler, time, metrics); - membershipManager.registerStateListener(commit); + membershipManager.registerStateListener(commitRequestManager); membershipManager.registerStateListener(applicationThreadMemberStateListener); heartbeatRequestManager = new ConsumerHeartbeatRequestManager( logContext, @@ -238,13 +230,25 @@ protected RequestManagers create() { metrics); } + final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions, + metadata, + fetchConfig.isolationLevel, + time, + retryBackoffMs, + requestTimeoutMs, + defaultApiTimeoutMs, + apiVersions, + networkClientDelegate, + commitRequestManager, + logContext); + return new RequestManagers( logContext, listOffsets, topic, fetch, Optional.ofNullable(coordinator), - Optional.ofNullable(commit), + Optional.ofNullable(commitRequestManager), Optional.ofNullable(heartbeatRequestManager), Optional.ofNullable(membershipManager) ); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 4b0584b5bb8c1..15525957fc136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -30,7 +30,7 @@ public abstract class ApplicationEvent { public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, - LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, + LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index d8fc13830f716..0baafcd3038d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; @@ -110,6 +111,12 @@ public long maximumTimeToWait() { public T addAndGet(final CompletableApplicationEvent event) { Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null"); add(event); + // Check if the thread was interrupted before we start waiting, to ensure that we + // propagate the exception even if we end up not having to wait (the event could complete + // between the time it's added and the time we attempt to getResult) + if (Thread.interrupted()) { + throw new InterruptException("Interrupted waiting for results for application event " + event); + } return ConsumerUtils.getResult(event.future()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 6ce8737c78a5e..6ae6c23f6ee1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -99,12 +99,8 @@ public void process(ApplicationEvent event) { process((ListOffsetsEvent) event); return; - case RESET_POSITIONS: - process((ResetPositionsEvent) event); - return; - - case VALIDATE_POSITIONS: - process((ValidatePositionsEvent) event); + case CHECK_AND_UPDATE_POSITIONS: + process((CheckAndUpdatePositionsEvent) event); return; case SUBSCRIPTION_CHANGE: @@ -259,13 +255,12 @@ private void process(final UnsubscribeEvent event) { } } - private void process(final ResetPositionsEvent event) { - CompletableFuture future = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); - future.whenComplete(complete(event.future())); - } - - private void process(final ValidatePositionsEvent event) { - CompletableFuture future = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + /** + * Check if all assigned partitions have fetch positions. If there are missing positions, fetch offsets and use + * them to update positions in the subscription state. + */ + private void process(final CheckAndUpdatePositionsEvent event) { + CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); future.whenComplete(complete(event.future())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java similarity index 52% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java index 86dbb80c0f0ac..2c7fdd7464283 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java @@ -17,14 +17,19 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.clients.consumer.internals.SubscriptionState; + /** - * Event for resetting offsets for all assigned partitions that require it. This is an - * asynchronous event that generates ListOffsets requests, and completes by updating in-memory - * positions when responses are received. + * Event to check if all assigned partitions have fetch positions. If there are positions missing, it will fetch + * offsets and update positions when it gets them. This will first attempt to use the committed offsets if available. If + * no committed offsets available, it will use the partition offsets retrieved from the leader. + *

+ * The event completes with a boolean indicating if all assigned partitions have valid fetch positions + * (based on {@link SubscriptionState#hasAllFetchPositions()}). */ -public class ResetPositionsEvent extends CompletableApplicationEvent { +public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent { - public ResetPositionsEvent(final long deadlineMs) { - super(Type.RESET_POSITIONS, deadlineMs); + public CheckAndUpdatePositionsEvent(long deadlineMs) { + super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java deleted file mode 100644 index a93ff9859a58e..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals.events; - -/** - * Event for validating offsets for all assigned partitions for which a leader change has been - * detected. This is an asynchronous event that generates OffsetForLeaderEpoch requests, and - * completes by validating in-memory positions against the offsets received in the responses. - */ -public class ValidatePositionsEvent extends CompletableApplicationEvent { - - public ValidatePositionsEvent(final long deadlineMs) { - super(Type.VALIDATE_POSITIONS, deadlineMs); - } -} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 2526b6447a4c6..ce7cd17c9965a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; @@ -44,12 +45,10 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; -import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; -import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -386,8 +385,7 @@ public void testWakeupBeforeCallingPoll() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -409,8 +407,7 @@ public void testWakeupAfterEmptyFetch() { consumer.wakeup(); return Fetch.empty(); }).doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -434,8 +431,7 @@ public void testWakeupAfterNonEmptyFetch() { consumer.wakeup(); return Fetch.forPartition(tp, records, true); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -454,6 +450,7 @@ public void testCommitInRebalanceCallback() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doAnswer(invocation -> Fetch.empty()).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); SortedSet sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); sortedPartitions.add(tp); CompletableBackgroundEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, sortedPartitions); @@ -496,8 +493,7 @@ public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = Collections.singletonMap(tp, new OffsetAndMetadata(1)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); doReturn(cluster).when(metadata).fetch(); doReturn(Collections.singleton(topicName)).when(cluster).topics(); @@ -529,8 +525,7 @@ public void testClearWakeupTriggerAfterPoll() { ); doReturn(Fetch.forPartition(tp, records, true)) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeCommitSyncApplicationEventSuccessfully(); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); @@ -575,157 +570,6 @@ public void testPollLongThrowsException() { "This method is deprecated and will be removed in the next major release.", e.getMessage()); } - @Test - public void testOffsetFetchStoresPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - // The first attempt at poll() creates an event, enqueues it, but its Future does not complete within the - // timeout, leaving a pending fetch. - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // For the second attempt, the event is reused, so first verify that another FetchCommittedOffsetsEvent - // was not enqueued. On this attempt the Future returns successfully, clearing the pending fetch. - event.future().complete(Collections.emptyMap()); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler, never()).add(any(FetchCommittedOffsetsEvent.class)); - assertDoesNotThrow(() -> ConsumerUtils.getResult(event.future(), time.timer(timeoutMs))); - assertFalse(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - - // The first attempt at poll() retrieves data for partition 0 of the topic. poll() creates an event, - // enqueues it, but its Future does not complete within the timeout, leaving a pending fetch. - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event1 = getLastEnqueuedEvent(); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // For the second attempt, the set of partitions is reassigned, causing the pending offset to be replaced. - // Verify that another FetchCommittedOffsetsEvent is enqueued. - consumer.assign(Collections.singleton(new TopicPartition("topic1", 1))); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event2 = getLastEnqueuedEvent(); - assertNotEquals(event1, event2); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // For the third attempt, the event from attempt 2 is reused, so there should not have been another - // FetchCommittedOffsetsEvent enqueued. The Future is completed to make it return successfully in poll(). - // This will finally clear out the pending fetch. - event2.future().complete(Collections.emptyMap()); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler, never()).add(any(FetchCommittedOffsetsEvent.class)); - assertDoesNotThrow(() -> ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs))); - assertFalse(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchDoesNotReuseExpiredPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - // The first attempt at poll() creates an event, enqueues it, but its Future does not complete within - // the timeout, leaving a pending fetch. - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event1 = getLastEnqueuedEvent(); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event1.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - clearInvocations(applicationEventHandler); - - // Sleep past the event's expiration, causing the poll() to *not* reuse the pending fetch. A new event - // is created and added to the application event queue. - time.sleep(event1.deadlineMs() - time.milliseconds()); - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event2 = getLastEnqueuedEvent(); - assertNotEquals(event1, event2); - assertThrows(TimeoutException.class, () -> ConsumerUtils.getResult(event2.future(), time.timer(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchTimeoutExceptionKeepsPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - event.future().completeExceptionally(new TimeoutException("Test error")); - assertDoesNotThrow(() -> consumer.poll(Duration.ofMillis(timeoutMs))); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchInterruptExceptionKeepsPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - event.future().completeExceptionally(new InterruptException("Test error")); - assertThrows(InterruptException.class, () -> consumer.poll(Duration.ofMillis(timeoutMs))); - assertTrue(Thread.interrupted()); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - } - - @Test - public void testOffsetFetchUnexpectedExceptionClearsPendingEvent() { - consumer = newConsumer(); - long timeoutMs = 0; - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeAssignmentChangeEventSuccessfully(); - consumer.assign(Collections.singleton(new TopicPartition("topic1", 0))); - - consumer.poll(Duration.ofMillis(timeoutMs)); - verify(applicationEventHandler).add(any(FetchCommittedOffsetsEvent.class)); - CompletableApplicationEvent> event = getLastEnqueuedEvent(); - assertTrue(consumer.hasPendingOffsetFetchEvent()); - - event.future().completeExceptionally(new NullPointerException("Test error")); - assertThrows(KafkaException.class, () -> consumer.poll(Duration.ofMillis(timeoutMs))); - assertFalse(consumer.hasPendingOffsetFetchEvent()); - } - @Test public void testCommitSyncLeaderEpochUpdate() { consumer = newConsumer(); @@ -801,7 +645,6 @@ public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() { consumer = newConsumer(config); completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); @@ -825,7 +668,6 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { consumer = newConsumer(config); completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); @@ -925,7 +767,6 @@ public void testPollTriggersFencedExceptionFromCommitAsync() { consumer = newConsumer(config); completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); final TopicPartition tp = new TopicPartition("foo", 0); completeAssignmentChangeEventSuccessfully(); @@ -945,7 +786,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); @@ -1443,25 +1284,6 @@ public void testNoInterceptorCommitAsyncFailed() { assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); } - @Test - public void testRefreshCommittedOffsetsSuccess() { - consumer = newConsumer(); - completeCommitSyncApplicationEventSuccessfully(); - TopicPartition partition = new TopicPartition("t1", 1); - Set partitions = Collections.singleton(partition); - Map committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L)); - testRefreshCommittedOffsetsSuccess(partitions, committedOffsets); - } - - @Test - public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() { - consumer = newConsumer(); - TopicPartition partition = new TopicPartition("t1", 1); - Set partitions = Collections.singleton(partition); - Map committedOffsets = Collections.emptyMap(); - testRefreshCommittedOffsetsSuccess(partitions, committedOffsets); - } - @Test public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { consumer = newConsumer(); @@ -1694,6 +1516,7 @@ public void testListenerCallbacksInvoke(List consumer.poll(Duration.ZERO)); } @@ -1865,6 +1688,7 @@ public void testEnsurePollEventSentOnConsumerPoll() { doAnswer(invocation -> Fetch.forPartition(tp, records, true)) .when(fetchCollector) .collectFetch(Mockito.any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); consumer.subscribe(singletonList("topic1")); consumer.poll(Duration.ofMillis(100)); @@ -1880,6 +1704,7 @@ private Properties requiredConsumerConfigAndGroupId(final String groupId) { private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) { completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(new TopicPartition("t1", 1))); @@ -1887,40 +1712,7 @@ private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean co consumer.poll(Duration.ZERO); verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class)); - - if (committedOffsetsEnabled) { - // Verify there was an FetchCommittedOffsets event and no ResetPositions event - verify(applicationEventHandler, atLeast(1)) - .add(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); - verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); - } else { - // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions - verify(applicationEventHandler, never()) - .add(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); - } - } - - private void testRefreshCommittedOffsetsSuccess(Set partitions, - Map committedOffsets) { - completeFetchedCommittedOffsetApplicationEventSuccessfully(committedOffsets); - doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); - - completeAssignmentChangeEventSuccessfully(); - consumer.assign(partitions); - - consumer.poll(Duration.ZERO); - - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class)); - verify(applicationEventHandler, atLeast(1)) - .add(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class)); - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class)); + .addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class)); } @Test @@ -1947,6 +1739,7 @@ public void testLongPollWaitIsLimited() { }).doAnswer(invocation -> Fetch.forPartition(tp, records, true) ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); @@ -2041,8 +1834,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); - Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); @@ -2079,6 +1871,7 @@ void testReaperInvokedInPoll() { consumer = newConsumer(); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); consumer.subscribe(Collections.singletonList("topic")); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); consumer.poll(Duration.ZERO); verify(backgroundEventReaper).reap(time.milliseconds()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index c28202efa01cc..e270c039eeb4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -20,10 +20,8 @@ import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.IsolationLevel; @@ -43,7 +41,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -60,25 +58,25 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.BlockingQueue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -90,9 +88,9 @@ public class OffsetsRequestManagerTest { private OffsetsRequestManager requestManager; private ConsumerMetadata metadata; private SubscriptionState subscriptionState; - private MockTime time; + private final Time time = mock(Time.class); private ApiVersions apiVersions; - private BlockingQueue backgroundEventQueue; + private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); private static final String TEST_TOPIC = "t1"; private static final TopicPartition TEST_PARTITION_1 = new TopicPartition(TEST_TOPIC, 1); private static final TopicPartition TEST_PARTITION_2 = new TopicPartition(TEST_TOPIC, 2); @@ -101,15 +99,13 @@ public class OffsetsRequestManagerTest { private static final IsolationLevel DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_COMMITTED; private static final int RETRY_BACKOFF_MS = 500; private static final int REQUEST_TIMEOUT_MS = 500; + private static final int DEFAULT_API_TIMEOUT_MS = 500; @BeforeEach public void setup() { LogContext logContext = new LogContext(); - backgroundEventQueue = new LinkedBlockingQueue<>(); - BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); metadata = mock(ConsumerMetadata.class); subscriptionState = mock(SubscriptionState.class); - time = new MockTime(0); apiVersions = mock(ApiVersions.class); requestManager = new OffsetsRequestManager( subscriptionState, @@ -118,9 +114,10 @@ public void setup() { time, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS, + DEFAULT_API_TIMEOUT_MS, apiVersions, mock(NetworkClientDelegate.class), - backgroundEventHandler, + commitRequestManager, logContext ); } @@ -541,39 +538,29 @@ public void testResetOffsetsAuthorizationFailure() { when(subscriptionState.resetStrategy(any())).thenReturn(OffsetResetStrategy.EARLIEST); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - requestManager.resetPositionsIfNeeded(); + CompletableFuture resetResult = requestManager.resetPositionsIfNeeded(); // Reset positions response with TopicAuthorizationException NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds()); NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); + assertFalse(resetResult.isDone()); Errors topicAuthorizationFailedError = Errors.TOPIC_AUTHORIZATION_FAILED; ClientResponse clientResponse = buildClientResponseWithErrors( unsentRequest, Collections.singletonMap(TEST_PARTITION_1, topicAuthorizationFailedError)); clientResponse.onComplete(); assertTrue(unsentRequest.future().isDone()); + assertTrue(resetResult.isDone()); assertFalse(unsentRequest.future().isCompletedExceptionally()); verify(subscriptionState).requestFailed(any(), anyLong()); verify(metadata).requestUpdate(false); - // Following resetPositions should enqueue the previous exception in the background event queue - // without performing any request - assertDoesNotThrow(() -> requestManager.resetPositionsIfNeeded()); + // Following resetPositions should throw the exception + CompletableFuture nextReset = assertDoesNotThrow(() -> requestManager.resetPositionsIfNeeded()); assertEquals(0, requestManager.requestsToSend()); - - // Check that the event was enqueued during resetPositionsIfNeeded - assertEquals(1, backgroundEventQueue.size()); - BackgroundEvent event = backgroundEventQueue.poll(); - assertNotNull(event); - - // Check that the event itself is of the expected type - assertInstanceOf(ErrorEvent.class, event); - ErrorEvent errorEvent = (ErrorEvent) event; - assertNotNull(errorEvent.error()); - - // Check that the error held in the event is of the expected type - assertInstanceOf(topicAuthorizationFailedError.exception().getClass(), errorEvent.error()); + assertTrue(nextReset.isCompletedExceptionally()); + assertFutureThrows(nextReset, TopicAuthorizationException.class); } @Test @@ -672,6 +659,107 @@ public void testValidatePositionsAbortIfNoApiVersionsToCheckAgainstThenRecovers( assertEquals(1, requestManager.requestsToSend(), "Invalid request count"); } + @Test + public void testUpdatePositionsWithCommittedOffsets() { + long internalFetchCommittedTimeout = time.milliseconds() + DEFAULT_API_TIMEOUT_MS; + TopicPartition tp1 = new TopicPartition("topic1", 1); + Set initPartitions1 = Collections.singleton(tp1); + Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, Optional.of(1)); + + // tp1 assigned and requires a position + mockAssignedPartitionsMissingPositions(initPartitions1, initPartitions1, leaderAndEpoch); + + // Call to updateFetchPositions. Should send an OffsetFetch request and use the response to set positions + CompletableFuture> fetchResult = new CompletableFuture<>(); + when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + assertFalse(updatePositions1.isDone(), "Update positions should wait for the OffsetFetch request"); + verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + + // Receive response with committed offsets. Should complete the updatePositions operation (the set + // of initializing partitions hasn't changed) + when(subscriptionState.initializingPartitions()).thenReturn(initPartitions1); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, Optional.of(1), ""); + fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata)); + + assertTrue(updatePositions1.isDone(), "Update positions should complete after the OffsetFetch response"); + SubscriptionState.FetchPosition expectedPosition = new SubscriptionState.FetchPosition( + offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), leaderAndEpoch); + verify(subscriptionState).seekUnvalidated(tp1, expectedPosition); + } + + @Test + public void testUpdatePositionsWithCommittedOffsetsReusesRequest() { + long internalFetchCommittedTimeout = time.milliseconds() + DEFAULT_API_TIMEOUT_MS; + TopicPartition tp1 = new TopicPartition("topic1", 1); + Set initPartitions1 = Collections.singleton(tp1); + Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, Optional.of(1)); + + // tp1 assigned and requires a position + mockAssignedPartitionsMissingPositions(initPartitions1, initPartitions1, leaderAndEpoch); + + // call to updateFetchPositions. Should send an OffsetFetch request + CompletableFuture> fetchResult = new CompletableFuture<>(); + when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + assertFalse(updatePositions1.isDone(), "Update positions should wait for the OffsetFetch request"); + verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + clearInvocations(commitRequestManager); + + // Call to updateFetchPositions again with the same set of initializing partitions should reuse request + CompletableFuture updatePositions2 = requestManager.updateFetchPositions(time.milliseconds()); + verify(commitRequestManager, never()).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + + // Receive response with committed offsets, should complete both calls + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, Optional.of(1), ""); + fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata)); + + assertTrue(updatePositions1.isDone()); + assertTrue(updatePositions2.isDone()); + SubscriptionState.FetchPosition expectedPosition = new SubscriptionState.FetchPosition( + offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), leaderAndEpoch); + verify(subscriptionState).seekUnvalidated(tp1, expectedPosition); + } + + @Test + public void testUpdatePositionsDoesNotApplyOffsetsIfPartitionNotInitializingAnymore() { + long internalFetchCommittedTimeout = time.milliseconds() + DEFAULT_API_TIMEOUT_MS; + TopicPartition tp1 = new TopicPartition("topic1", 1); + Set initPartitions1 = Collections.singleton(tp1); + Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, Optional.of(1)); + + // tp1 assigned and requires a position + mockAssignedPartitionsMissingPositions(initPartitions1, initPartitions1, leaderAndEpoch); + + // call to updateFetchPositions will trigger an OffsetFetch request for tp1 (won't complete just yet) + CompletableFuture> fetchResult = new CompletableFuture<>(); + when(commitRequestManager.fetchOffsets(initPartitions1, internalFetchCommittedTimeout)).thenReturn(fetchResult); + CompletableFuture updatePositions1 = requestManager.updateFetchPositions(time.milliseconds()); + assertFalse(updatePositions1.isDone()); + verify(commitRequestManager).fetchOffsets(initPartitions1, internalFetchCommittedTimeout); + clearInvocations(commitRequestManager); + + // tp1 does not require a position anymore (ex. removed from the assignment, or got a position manually via + // seek). When the OffsetFetch response is received, it should not update the position for tp1 to the + // committed offset + when(subscriptionState.initializingPartitions()).thenReturn(Collections.emptySet()); + fetchResult.complete(Collections.singletonMap(tp1, new OffsetAndMetadata(5))); + verify(subscriptionState, never()).seekUnvalidated(any(), any()); + } + + private void mockAssignedPartitionsMissingPositions(Set assignedPartitions, + Set initializingPartitions, + Metadata.LeaderAndEpoch leaderAndEpoch) { + when(subscriptionState.partitionsNeedingValidation(anyLong())).thenReturn(Collections.emptySet()); + assignedPartitions.forEach(tp -> { + when(subscriptionState.isAssigned(tp)).thenReturn(true); + when(metadata.currentLeader(tp)).thenReturn(leaderAndEpoch); + }); + + when(subscriptionState.hasAllFetchPositions()).thenReturn(false); + when(subscriptionState.initializingPartitions()).thenReturn(initializingPartitions); + } + private void mockSuccessfulBuildRequestForValidatingPositions(SubscriptionState.FetchPosition position, Node leader) { when(subscriptionState.partitionsNeedingValidation(time.milliseconds())).thenReturn(Collections.singleton(TEST_PARTITION_1)); when(subscriptionState.position(any())).thenReturn(position, position); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 84a7ac84d1c7f..80315099fb892 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -68,6 +68,7 @@ public class ApplicationEventProcessorTest { private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); private final ConsumerHeartbeatRequestManager heartbeatRequestManager = mock(ConsumerHeartbeatRequestManager.class); private final ConsumerMembershipManager membershipManager = mock(ConsumerMembershipManager.class); + private final OffsetsRequestManager offsetsRequestManager = mock(OffsetsRequestManager.class); private final SubscriptionState subscriptionState = mock(SubscriptionState.class); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; @@ -75,7 +76,7 @@ public class ApplicationEventProcessorTest { private void setupProcessor(boolean withGroupId) { RequestManagers requestManagers = new RequestManagers( new LogContext(), - mock(OffsetsRequestManager.class), + offsetsRequestManager, mock(TopicMetadataRequestManager.class), mock(FetchRequestManager.class), withGroupId ? Optional.of(mock(CoordinatorRequestManager.class)) : Optional.empty(), @@ -128,8 +129,7 @@ private static Stream applicationEvents() { Arguments.of(new PollEvent(100)), Arguments.of(new AsyncCommitEvent(new HashMap<>())), Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)), - Arguments.of(new ResetPositionsEvent(500)), - Arguments.of(new ValidatePositionsEvent(500)), + Arguments.of(new CheckAndUpdatePositionsEvent(500)), Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), Arguments.of(new AssignmentChangeEvent(12345, 12345, Collections.emptyList()))); } @@ -144,14 +144,6 @@ public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { verify(applicationEventProcessor).process(any(ListOffsetsEvent.class)); } - @Test - public void testResetPositionsProcess() { - ApplicationEventProcessor applicationEventProcessor = mock(ApplicationEventProcessor.class); - ResetPositionsEvent event = new ResetPositionsEvent(calculateDeadlineMs(time, 100)); - applicationEventProcessor.process(event); - verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testAssignmentChangeEvent(boolean withGroupId) {