Skip to content

Commit

Permalink
KAFKA-15974: Enforce that event processing respects user-provided tim…
Browse files Browse the repository at this point in the history
…eout (apache#15640)

The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.

The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.

On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.

The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.

Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.

This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.

Reviewers: Lianet Magrans <[email protected]>, Philip Nee <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
kirktrue authored and chiacyu committed Jun 1, 2024
1 parent b3f22ea commit c4916b5
Show file tree
Hide file tree
Showing 27 changed files with 802 additions and 383 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.KafkaThread;
Expand All @@ -31,9 +33,11 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
Expand All @@ -50,6 +54,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
private final Time time;
private final Logger log;
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
private final CompletableEventReaper applicationEventReaper;
private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier;
private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier;
private final Supplier<RequestManagers> requestManagersSupplier;
Expand All @@ -63,12 +69,16 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {

public ConsumerNetworkThread(LogContext logContext,
Time time,
BlockingQueue<ApplicationEvent> applicationEventQueue,
CompletableEventReaper applicationEventReaper,
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
Supplier<RequestManagers> requestManagersSupplier) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.applicationEventReaper = applicationEventReaper;
this.applicationEventProcessorSupplier = applicationEventProcessorSupplier;
this.networkClientDelegateSupplier = networkClientDelegateSupplier;
this.requestManagersSupplier = requestManagersSupplier;
Expand Down Expand Up @@ -125,10 +135,7 @@ void initializeResources() {
* </ol>
*/
void runOnce() {
// Process the events—if any—that were produced by the application thread. It is possible that when processing
// an event generates an error. In such cases, the processor will log an exception, but we do not want those
// errors to be propagated to the caller.
applicationEventProcessor.process();
processApplicationEvents();

final long currentTimeMs = time.milliseconds();
final long pollWaitTimeMs = requestManagers.entries().stream()
Expand All @@ -144,6 +151,36 @@ void runOnce() {
.map(Optional::get)
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
.reduce(Long.MAX_VALUE, Math::min);

reapExpiredApplicationEvents(currentTimeMs);
}

/**
* Process the events—if any—that were produced by the application thread.
*/
private void processApplicationEvents() {
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);

for (ApplicationEvent event : events) {
try {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);

applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
}
}
}

/**
* "Complete" any events that have expired. This cleanup step should only be called after the network I/O
* thread has made at least one call to {@link NetworkClientDelegate#poll(long, long) poll} so that each event
* is given least one attempt to satisfy any network requests <em>before</em> checking if a timeout has expired.
*/
private void reapExpiredApplicationEvents(long currentTimeMs) {
applicationEventReaper.reap(currentTimeMs);
}

/**
Expand Down Expand Up @@ -273,9 +310,10 @@ void cleanup() {
log.error("Unexpected error during shutdown. Proceed with closing.", e);
} finally {
sendUnsentRequests(timer);
applicationEventReaper.reap(applicationEventQueue);

closeQuietly(requestManagers, "request managers");
closeQuietly(networkClientDelegate, "network client delegate");
closeQuietly(applicationEventProcessor, "application event processor");
log.debug("Closed the consumer network thread");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(Consume
Set<TopicPartition> partitions) {
SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
sortedPartitions.addAll(partitions);

CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
backgroundEventHandler.add(event);
log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Timer;

import java.util.List;
import java.util.Map;

public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {

protected AbstractTopicMetadataEvent(final Type type, final Timer timer) {
super(type, timer);
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.common.utils.Timer;

public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent {

public AllTopicsMetadataEvent(final Timer timer) {
super(Type.ALL_TOPICS_METADATA, timer);
public AllTopicsMetadataEvent(final long deadlineMs) {
super(Type.ALL_TOPICS_METADATA, deadlineMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

Expand All @@ -32,7 +31,6 @@
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -49,13 +47,16 @@ public class ApplicationEventHandler implements Closeable {
public ApplicationEventHandler(final LogContext logContext,
final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Supplier<RequestManagers> requestManagersSupplier) {
this.log = logContext.logger(ApplicationEventHandler.class);
this.applicationEventQueue = applicationEventQueue;
this.networkThread = new ConsumerNetworkThread(logContext,
time,
applicationEventQueue,
applicationEventReaper,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier);
Expand Down Expand Up @@ -99,17 +100,16 @@ public long maximumTimeToWait() {
*
* <p/>
*
* See {@link ConsumerUtils#getResult(Future, Timer)} and {@link Future#get(long, TimeUnit)} for more details.
* See {@link ConsumerUtils#getResult(Future)} for more details.
*
* @param event A {@link CompletableApplicationEvent} created by the polling thread
* @return Value that is the result of the event
* @param <T> Type of return value of the event
*/
public <T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer) {
public <T> T addAndGet(final CompletableApplicationEvent<T> event) {
Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null");
Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null");
add(event);
return ConsumerUtils.getResult(event.future(), timer);
return ConsumerUtils.getResult(event.future());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand All @@ -42,31 +41,20 @@
* An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread}
* which processes {@link ApplicationEvent application events} generated by the application thread.
*/
public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> {
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {

private final Logger log;
private final ConsumerMetadata metadata;
private final RequestManagers requestManagers;

public ApplicationEventProcessor(final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final RequestManagers requestManagers,
final ConsumerMetadata metadata) {
super(logContext, applicationEventQueue);
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
this.metadata = metadata;
}

/**
* Process the events—if any—that were produced by the application thread. It is possible that when processing
* an event generates an error. In such cases, the processor will log an exception, but we do not want those
* errors to be propagated to the caller.
*/
public boolean process() {
return process((event, error) -> error.ifPresent(e -> log.warn("Error processing event {}", e.getMessage(), e)));
}

@SuppressWarnings({"CyclomaticComplexity"})
@Override
public void process(ApplicationEvent event) {
Expand Down Expand Up @@ -273,7 +261,7 @@ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event
manager.consumerRebalanceListenerCallbackCompleted(event);
}

private void process(final CommitOnCloseEvent event) {
private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) {
if (!requestManagers.commitRequestManager.isPresent())
return;
log.debug("Signal CommitRequestManager closing");
Expand Down Expand Up @@ -309,15 +297,13 @@ private void process(final LeaveOnCloseEvent event) {
*/
public static Supplier<ApplicationEventProcessor> supplier(final LogContext logContext,
final ConsumerMetadata metadata,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final Supplier<RequestManagers> requestManagersSupplier) {
return new CachedSupplier<ApplicationEventProcessor>() {
@Override
protected ApplicationEventProcessor create() {
RequestManagers requestManagers = requestManagersSupplier.get();
return new ApplicationEventProcessor(
logContext,
applicationEventQueue,
requestManagers,
metadata
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Timer;

import java.util.Collections;
import java.util.Map;
Expand All @@ -30,11 +29,6 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
*/
private final Map<TopicPartition, OffsetAndMetadata> offsets;

protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final Timer timer) {
super(type, timer);
this.offsets = validate(offsets);
}

protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) {
super(type, deadlineMs);
this.offsets = validate(offsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.common.utils.Timer;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -32,13 +29,9 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
private final CompletableFuture<T> future;
private final long deadlineMs;

protected CompletableApplicationEvent(final Type type, final Timer timer) {
super(type);
this.future = new CompletableFuture<>();
Objects.requireNonNull(timer);
this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
}

/**
* <em>Note</em>: the {@code deadlineMs} is the future time of expiration, <em>not</em> a timeout.
*/
protected CompletableApplicationEvent(final Type type, final long deadlineMs) {
super(type);
this.future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,29 @@
public abstract class CompletableBackgroundEvent<T> extends BackgroundEvent implements CompletableEvent<T> {

private final CompletableFuture<T> future;
private final long deadlineMs;

protected CompletableBackgroundEvent(final Type type) {
/**
* <em>Note</em>: the {@code deadlineMs} is the future time of expiration, <em>not</em> a timeout.
*/
protected CompletableBackgroundEvent(final Type type, final long deadlineMs) {
super(type);
this.future = new CompletableFuture<>();
this.deadlineMs = deadlineMs;
}

@Override
public CompletableFuture<T> future() {
return future;
}

@Override
public long deadlineMs() {
return deadlineMs;
}

@Override
protected String toStringBase() {
return super.toStringBase() + ", future=" + future;
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
}
}
Loading

0 comments on commit c4916b5

Please sign in to comment.