Skip to content

Commit

Permalink
KAFKA-17066 new consumer updateFetchPositions all in background thread (
Browse files Browse the repository at this point in the history
#16885)

Fix for the known issue that the logic for updating fetch positions in the new consumer was being performed partly in the app thread, party in the background thread, potentially leading to race conditions on the subscription state.

This PR moves the logic for updateFetchPositions to the background thread as a single event (instead of triggering separate events to validate, fetchOffsets, listOffsets). A new UpdateFetchPositionsEvent is triggered from the app thread and processed in the background, where it performs those same operations and updates the subscription state accordingly, without blocking the background thread.

This PR maintains the existing logic for keeping a pendingOffsetFetchRequest that does not complete within the lifetime of the updateFetchPositions attempt, and may be used on the next call to updateFetchPositions.

Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
lianetm authored Sep 15, 2024
1 parent f7430cf commit 6744a71
Show file tree
Hide file tree
Showing 12 changed files with 491 additions and 477 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
Expand All @@ -54,13 +55,11 @@
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -130,7 +129,6 @@
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createLogContext;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.refreshCommittedOffsets;
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
Expand Down Expand Up @@ -254,8 +252,6 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refCount = new AtomicInteger(0);

private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;

AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
Expand Down Expand Up @@ -1576,48 +1572,28 @@ private Fetch<K, V> collectFetch() {

return fetch;
}

/**
* Set the fetch position to the committed position (if there is one)
* or reset it using the offset reset policy the user has configured.
*
* @throws AuthenticationException If authentication fails. See the exception for more details
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
* @return true iff the operation completed without timing out
* @throws AuthenticationException If authentication fails. See the exception for more details
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
*/
private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHasAllFetchPositions = false;
try {
// Validate positions using the partition leader end offsets, to detect if any partition
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
// request, retrieve the partition end offsets, and validate the current position against it.
applicationEventHandler.addAndGet(new ValidatePositionsEvent(calculateDeadlineMs(timer)));

cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHasAllFetchPositions) return true;

// Reset positions using committed offsets retrieved from the group coordinator, for any
// partitions which do not have a valid position and are not awaiting reset. This will
// trigger an OffsetFetch request and update positions with the offsets retrieved. This
// will only do a coordinator lookup if there are partitions which have missing
// positions, so a consumer with manually assigned partitions can avoid a coordinator
// dependence by always ensuring that assigned partitions have an initial position.
if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer))
return false;

// If there are partitions still needing a position and a reset policy is defined,
// request reset using the default policy. If no reset strategy is defined and there
// are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception.
subscriptions.resetInitializingPositions();

// Reset positions using partition offsets retrieved from the leader, for any partitions
// which are awaiting reset. This will trigger a ListOffset request, retrieve the
// partition offsets according to the strategy (ex. earliest, latest), and update the
// positions.
applicationEventHandler.addAndGet(new ResetPositionsEvent(calculateDeadlineMs(timer)));
return true;
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
} catch (TimeoutException e) {
return false;
} finally {
wakeupTrigger.clearTask();
}
return true;
}

/**
Expand All @@ -1629,79 +1605,6 @@ private boolean isCommittedOffsetsManagementEnabled() {
return groupMetadata.get().isPresent();
}

/**
* Refresh the committed offsets for partitions that require initialization.
*
* @param timer Timer bounding how long this method can block
* @return true iff the operation completed within the timeout
*/
private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
final Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions();

if (initializingPartitions.isEmpty())
return true;

log.debug("Refreshing committed offsets for partitions {}", initializingPartitions);

// The shorter the timeout provided to poll(), the more likely the offsets fetch will time out. To handle
// this case, on the first attempt to fetch the committed offsets, a FetchCommittedOffsetsEvent is created
// (with potentially a longer timeout) and stored. The event is used for the first attempt, but in the
// case it times out, subsequent attempts will also use the event in order to wait for the results.
if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
// Give the event a reasonable amount of time to complete.
final long timeoutMs = Math.max(defaultApiTimeoutMs, timer.remainingMs());
final long deadlineMs = calculateDeadlineMs(time, timeoutMs);
pendingOffsetFetchEvent = new FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
applicationEventHandler.add(pendingOffsetFetchEvent);
}

final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = pendingOffsetFetchEvent.future();

try {
wakeupTrigger.setActiveTask(future);
final Map<TopicPartition, OffsetAndMetadata> offsets = ConsumerUtils.getResult(future, timer);

// Clear the pending event once its result is successfully retrieved.
pendingOffsetFetchEvent = null;

refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;
} catch (TimeoutException e) {
log.debug(
"The committed offsets for the following partition(s) could not be refreshed within the timeout: {} ",
initializingPartitions
);
return false;
} catch (InterruptException e) {
throw e;
} catch (Throwable t) {
pendingOffsetFetchEvent = null;
throw ConsumerUtils.maybeWrapAsKafkaException(t);
} finally {
wakeupTrigger.clearTask();
}
}

/**
* This determines if the {@link #pendingOffsetFetchEvent pending offset fetch event} can be reused. Reuse
* is only possible if all the following conditions are true:
*
* <ul>
* <li>A pending offset fetch event exists</li>
* <li>The partition set of the pending offset fetch event is the same as the given partition set</li>
* <li>The pending offset fetch event has not expired</li>
* </ul>
*/
private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions) {
if (pendingOffsetFetchEvent == null)
return false;

if (!pendingOffsetFetchEvent.partitions().equals(partitions))
return false;

return pendingOffsetFetchEvent.deadlineMs() > time.milliseconds();
}

private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
if (offsetAndMetadata != null)
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
Expand Down Expand Up @@ -2002,10 +1905,6 @@ SubscriptionState subscriptions() {
return subscriptions;
}

boolean hasPendingOffsetFetchEvent() {
return pendingOffsetFetchEvent != null;
}

private void maybeUpdateSubscriptionMetadata() {
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ public static void refreshCommittedOffsets(final Map<TopicPartition, OffsetAndMe
}
}

public static <T> T getResult(Future<T> future, Timer timer) {
public static <T> T getResult(Future<T> future, long timeoutMs) {
try {
return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalStateException)
throw (IllegalStateException) e.getCause();
Expand All @@ -229,6 +229,10 @@ public static <T> T getResult(Future<T> future, Timer timer) {
}
}

public static <T> T getResult(Future<T> future, Timer timer) {
return getResult(future, timer.remainingMs());
}

public static <T> T getResult(Future<T> future) {
try {
return future.get();
Expand Down
Loading

0 comments on commit 6744a71

Please sign in to comment.