Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15974: Enforce that event processing respects user-provided timeout #15640

Merged
merged 160 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 152 commits
Commits
Show all changes
160 commits
Select commit Hold shift + click to select a range
666d74b
WIP
kirktrue Jan 23, 2024
9d6ec58
Lots more changes
kirktrue Jan 23, 2024
bbbfec7
Updates
kirktrue Jan 24, 2024
a0f6fc5
Reverting
kirktrue Jan 24, 2024
72b96b5
Reverting toString() changes
kirktrue Jan 24, 2024
ca1f16d
Revert toString() changes
kirktrue Jan 24, 2024
c88fa3b
More reverting
kirktrue Jan 24, 2024
3b5bc62
Reverts
kirktrue Jan 24, 2024
46af018
Reverting
kirktrue Jan 24, 2024
4998699
Reverts
kirktrue Jan 24, 2024
1366f08
Indent SNAFU
kirktrue Jan 24, 2024
73fd588
Indentation reverts
kirktrue Jan 24, 2024
6030c8e
Update AsyncKafkaConsumer.java
kirktrue Jan 24, 2024
7249756
Everything compiles (for now)
kirktrue Jan 24, 2024
338440e
Update HeartbeatRequestManagerTest.java
kirktrue Jan 24, 2024
7ee977b
Updates to fix some tests
kirktrue Jan 24, 2024
eff925d
Updates
kirktrue Jan 24, 2024
d1dfdc6
Update ApplicationEventProcessorTest.java
kirktrue Jan 24, 2024
79880a4
Updates
kirktrue Jan 25, 2024
d96e75b
Update TopicMetadataApplicationEvent.java
kirktrue Jan 25, 2024
1ee795e
Updates
kirktrue Jan 25, 2024
a685b4e
Update ErrorBackgroundEvent.java
kirktrue Jan 25, 2024
dcdb767
Updates
kirktrue Jan 25, 2024
de267b7
Updates
kirktrue Jan 25, 2024
c94bcb9
Update ApplicationEventProcessor.java
kirktrue Jan 25, 2024
337c306
Update ApplicationEventProcessor.java
kirktrue Jan 25, 2024
422f935
Update ApplicationEventProcessor.java
kirktrue Jan 25, 2024
d08d4a6
Updates to include RelaxedCompletableFuture
kirktrue Jan 31, 2024
fe6a890
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Feb 21, 2024
1533901
Updates
kirktrue Feb 21, 2024
5d66644
Update CoordinatorRequestManager.java
kirktrue Feb 21, 2024
fc9d6e8
Updates
kirktrue Feb 21, 2024
c301d5f
Reverting whitespace and other unwanted changes
kirktrue Feb 21, 2024
e4d0658
Reverting whitespace-only changes
kirktrue Feb 21, 2024
669edb9
Whitespace
kirktrue Feb 21, 2024
f963337
UGHGGHGHGHGGH
kirktrue Feb 21, 2024
733f3f0
Update TopicMetadataRequestManager.java
kirktrue Feb 21, 2024
8de90d6
Update ApplicationEventProcessor.java
kirktrue Feb 21, 2024
296163f
Update ApplicationEventProcessor.java
kirktrue Feb 21, 2024
e761ba5
Updates
kirktrue Feb 22, 2024
e5bdf82
Reverting unnecessary changesd
kirktrue Feb 22, 2024
60f9a0a
Whitespace and stuff
kirktrue Feb 22, 2024
2699626
Whitespace
kirktrue Feb 22, 2024
b6907ab
Whitspace
kirktrue Feb 22, 2024
567e3f8
Whitespace
kirktrue Feb 22, 2024
0bd6c66
Delete RelaxedCompletableFuture.java
kirktrue Feb 22, 2024
3e73bb7
Updates
kirktrue Feb 22, 2024
e4c0486
Updates
kirktrue Feb 22, 2024
a9ce15a
Updates
kirktrue Feb 22, 2024
c2b9b52
Updates
kirktrue Feb 22, 2024
f446644
Ugh
kirktrue Feb 22, 2024
9940cf1
Update ConsumerNetworkThreadTest.java
kirktrue Feb 22, 2024
c1bf7d9
Update CommitApplicationEvent.java
kirktrue Feb 22, 2024
8fa83c6
Proof of concept moving timeout logic to network thread
kirktrue Feb 22, 2024
3b2b99a
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Feb 22, 2024
162f4fa
Merge remote-tracking branch 'origin/trunk' into KAFKA-15974-enforce-…
kirktrue Feb 27, 2024
63b9546
Reverting, for the most part
kirktrue Feb 27, 2024
29fdd33
WIP
kirktrue Feb 27, 2024
92d1062
Updates to events
kirktrue Feb 27, 2024
dfec9d1
Remove Timer from ConsumerRebalanceListenerCallbackNeededEvent
kirktrue Feb 27, 2024
e1b1197
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Mar 1, 2024
44757aa
Updates to clean up more clumsiness
kirktrue Mar 1, 2024
84c8f8d
Update ApplicationEventProcessor.java
kirktrue Mar 1, 2024
b3519aa
Reverting unnecessary changes
kirktrue Mar 1, 2024
00eb0cb
Reverting more code to make diffs less
kirktrue Mar 1, 2024
528dc4f
Update ApplicationEventProcessor.java
kirktrue Mar 1, 2024
036e33b
Update EventProcessor.java
kirktrue Mar 1, 2024
e8dfea3
Update ApplicationEventProcessor.java
kirktrue Mar 1, 2024
4eae988
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Mar 1, 2024
ce7f115
Merge remote-tracking branch 'origin/trunk' into KAFKA-15974-enforce-…
kirktrue Mar 8, 2024
027b232
Updates
kirktrue Mar 8, 2024
1d4d6b0
Updates
kirktrue Mar 8, 2024
c00ffc2
Updates to timeout values
kirktrue Mar 8, 2024
25eea60
Refactored the event processors and event reaper
kirktrue Mar 10, 2024
83cbf6f
Updates
kirktrue Mar 11, 2024
339557f
Update AsyncKafkaConsumer.java
kirktrue Mar 11, 2024
22b0329
Reverting code
kirktrue Mar 11, 2024
02ccdc6
Update AsyncKafkaConsumer.java
kirktrue Mar 11, 2024
223aadd
More general cleanup
kirktrue Mar 11, 2024
001fbe9
Reverting unnecessary changes
kirktrue Mar 11, 2024
9f1dfbe
Revert unnecessary changes
kirktrue Mar 11, 2024
b055cef
Updates to remove unnecessary changes
kirktrue Mar 11, 2024
cb75d3f
Revert unnecessary changes
kirktrue Mar 11, 2024
e893f0f
More cleanup
kirktrue Mar 11, 2024
53a4ae2
Comments changes
kirktrue Mar 13, 2024
765eb22
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Mar 21, 2024
d93ab5c
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Mar 22, 2024
fa67c00
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Mar 25, 2024
6c431ec
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-requests
kirktrue Mar 26, 2024
42c6fe6
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Mar 28, 2024
6292034
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Mar 29, 2024
8a7d7c3
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 1, 2024
968a535
Updates to include more unit tests
kirktrue Apr 1, 2024
c4ff61b
Updated EventProcessor JavaDoc
kirktrue Apr 2, 2024
dab563b
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 4, 2024
bd3b3fc
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 8, 2024
69f3f91
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 9, 2024
9084cbd
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 11, 2024
df70086
Refactoring for less diff noise
kirktrue Apr 11, 2024
83790d4
Trying more to clean up diffs
kirktrue Apr 11, 2024
7289abf
Reverting some MockTime related tweaks
kirktrue Apr 11, 2024
5eeeea2
More reverts
kirktrue Apr 11, 2024
b520fbd
More reverts
kirktrue Apr 11, 2024
211f297
Yet still more refactoring
kirktrue Apr 11, 2024
26d369b
Reverting the reverts as it broke other stuff :(
kirktrue Apr 11, 2024
147e5a3
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 16, 2024
0ae194f
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 17, 2024
37cb285
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 23, 2024
23003eb
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 25, 2024
7f2d231
Added a brief comment about the case where close() is called from wit…
kirktrue Apr 25, 2024
0c646ad
Minor refactoring of comments and code in ConsumerNetworkThread per P…
kirktrue Apr 25, 2024
fd48d1f
Update clients/src/main/java/org/apache/kafka/clients/consumer/intern…
kirktrue Apr 25, 2024
68beee5
Made reapIncomplete's stream processing code consistent with reapExpi…
kirktrue Apr 25, 2024
337d1f7
Added calculateDeadlineMs to CompletableEvent for reuse by implementa…
kirktrue Apr 25, 2024
74a7ed7
Removed EventProcessor.close() as it is no longer needed
kirktrue Apr 25, 2024
92c2d85
Removed the need for explicit Long.MAX_VALUE-based Timer objects for …
kirktrue Apr 25, 2024
2981ebe
Removed superfluous import
kirktrue Apr 25, 2024
6c34198
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 25, 2024
36388d5
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue Apr 29, 2024
1e2144b
Updates to use long instead of Timer to indicate the deadline
kirktrue Apr 29, 2024
8bec7d7
More updates to remove another usage of Timer to calculate deadline
kirktrue Apr 29, 2024
097522a
Moved SyncCommitEvent to use the user-provided Duration directly
kirktrue Apr 29, 2024
87b3611
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 1, 2024
1803dac
Reworked class-level documentation for CompletableEventReaper
kirktrue May 1, 2024
71af574
More typos in CompletableEventReaper comments
kirktrue May 1, 2024
63903a1
Changed the ambiguous process method to processBackgroundEvents
kirktrue May 1, 2024
909f4dc
Changes to processBackgroundEvents comments
kirktrue May 1, 2024
1aa4c31
Removed logging from processBackgroundEvents to simplify logic
kirktrue May 1, 2024
d8959db
Changed the signature of reapIncomplete to remove boilerplate for cal…
kirktrue May 1, 2024
0248c92
Added back 'final' parameter qualifier
kirktrue May 1, 2024
290b081
Made the abstract CommitEvent class' constructor protected (again)
kirktrue May 1, 2024
987dc4a
Updated CompletableEvent.future() documentation to reflect using comp…
kirktrue May 1, 2024
27ed6fc
Removed MockTime instances created with non-default arguments
kirktrue May 1, 2024
b54c817
Minor formatting change per feedback
kirktrue May 1, 2024
26d64e8
Restored modified comment to ApplicationEventHandler.addAndGet()
kirktrue May 1, 2024
5d212c2
Removed unnecessary import that broke the build
kirktrue May 1, 2024
f800e71
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 14, 2024
a92e25e
Using a Collection instead of BlockingQueue for simplicity
kirktrue May 14, 2024
8368718
Removed use of Timer in CompletableEventReaperTest
kirktrue May 14, 2024
40cc477
Minor refactoring to avoid generics
kirktrue May 14, 2024
1234c1e
Added unit test to ConsumerNetworkThread and minor refactoring/renaming
kirktrue May 14, 2024
b3b93a9
Added test to AsyncKafkaConsumerTest
kirktrue May 14, 2024
16aa7ea
Update AsyncKafkaConsumer.java
kirktrue May 14, 2024
40a6e76
Removing some noisy trace logging
kirktrue May 14, 2024
b51a8c0
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 14, 2024
7067b95
Fixed lint error
kirktrue May 14, 2024
3d26009
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 15, 2024
284cf94
Update to ConsumerNetworkThread test logic
kirktrue May 15, 2024
6a3ced6
More work on improving and simplifying unit tests
kirktrue May 15, 2024
3018de2
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 16, 2024
ec0edd4
Added more unit tests to ensure the reaper is being properly invoked
kirktrue May 16, 2024
3eb054a
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 17, 2024
2e445c5
Merge branch 'trunk' into KAFKA-15974-enforce-timeout-in-events
kirktrue May 21, 2024
ae6a6cd
Testing expected value in timestamp-related reap tests
kirktrue May 21, 2024
808eb4b
Added testEnsureEventsAreCompleted() back along with all its trapping
kirktrue May 21, 2024
e9ea1e6
Update testCommitAsyncWithFencedException
kirktrue May 21, 2024
62bec7e
Changing expiration logic in CompletableEventReaper.reap() to match T…
kirktrue May 21, 2024
0f168e2
Removed outdated logic in testCommitAsyncWithFencedException
kirktrue May 21, 2024
c423560
maybeAutoCommitSync > autoCommitSync is there's no longer a conditional
kirktrue May 21, 2024
91af164
Refactored processBackgroundEvents to eliminate passing in the EventP…
kirktrue May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.KafkaThread;
Expand All @@ -31,9 +33,11 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
Expand All @@ -50,6 +54,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
private final Time time;
private final Logger log;
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
private final CompletableEventReaper applicationEventReaper;
private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier;
private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier;
private final Supplier<RequestManagers> requestManagersSupplier;
Expand All @@ -63,12 +69,16 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {

public ConsumerNetworkThread(LogContext logContext,
Time time,
BlockingQueue<ApplicationEvent> applicationEventQueue,
CompletableEventReaper applicationEventReaper,
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
Supplier<RequestManagers> requestManagersSupplier) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.applicationEventReaper = applicationEventReaper;
this.applicationEventProcessorSupplier = applicationEventProcessorSupplier;
this.networkClientDelegateSupplier = networkClientDelegateSupplier;
this.requestManagersSupplier = requestManagersSupplier;
Expand Down Expand Up @@ -125,10 +135,7 @@ void initializeResources() {
* </ol>
*/
void runOnce() {
// Process the events—if any—that were produced by the application thread. It is possible that when processing
// an event generates an error. In such cases, the processor will log an exception, but we do not want those
// errors to be propagated to the caller.
applicationEventProcessor.process();
processApplicationEvents();

final long currentTimeMs = time.milliseconds();
final long pollWaitTimeMs = requestManagers.entries().stream()
Expand All @@ -144,6 +151,36 @@ void runOnce() {
.map(Optional::get)
.map(rm -> rm.maximumTimeToWait(currentTimeMs))
.reduce(Long.MAX_VALUE, Math::min);

reapExpiredApplicationEvents(currentTimeMs);
}

/**
* Process the events—if any—that were produced by the application thread.
*/
private void processApplicationEvents() {
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);

for (ApplicationEvent event : events) {
try {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);

applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
kirktrue marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/**
* "Complete" any events that have expired. This cleanup step should only be called after the network I/O
* thread has made at least one call to {@link NetworkClientDelegate#poll(long, long) poll} so that each event
* is given least one attempt to satisfy any network requests <em>before</em> checking if a timeout has expired.
*/
private void reapExpiredApplicationEvents(long currentTimeMs) {
applicationEventReaper.reap(currentTimeMs);
}

/**
Expand Down Expand Up @@ -273,9 +310,10 @@ void cleanup() {
log.error("Unexpected error during shutdown. Proceed with closing.", e);
} finally {
sendUnsentRequests(timer);
applicationEventReaper.reap(applicationEventQueue);

closeQuietly(requestManagers, "request managers");
closeQuietly(networkClientDelegate, "network client delegate");
closeQuietly(applicationEventProcessor, "application event processor");
log.debug("Closed the consumer network thread");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(Consume
Set<TopicPartition> partitions) {
SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
sortedPartitions.addAll(partitions);

CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
backgroundEventHandler.add(event);
log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Timer;

import java.util.List;
import java.util.Map;

public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {

protected AbstractTopicMetadataEvent(final Type type, final Timer timer) {
super(type, timer);
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.common.utils.Timer;

public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent {

public AllTopicsMetadataEvent(final Timer timer) {
super(Type.ALL_TOPICS_METADATA, timer);
public AllTopicsMetadataEvent(final long deadlineMs) {
super(Type.ALL_TOPICS_METADATA, deadlineMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

Expand All @@ -32,7 +31,6 @@
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -49,13 +47,16 @@ public class ApplicationEventHandler implements Closeable {
public ApplicationEventHandler(final LogContext logContext,
final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Supplier<RequestManagers> requestManagersSupplier) {
this.log = logContext.logger(ApplicationEventHandler.class);
this.applicationEventQueue = applicationEventQueue;
this.networkThread = new ConsumerNetworkThread(logContext,
time,
applicationEventQueue,
applicationEventReaper,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier);
Expand Down Expand Up @@ -99,17 +100,16 @@ public long maximumTimeToWait() {
*
* <p/>
*
* See {@link ConsumerUtils#getResult(Future, Timer)} and {@link Future#get(long, TimeUnit)} for more details.
* See {@link ConsumerUtils#getResult(Future)} for more details.
*
* @param event A {@link CompletableApplicationEvent} created by the polling thread
* @return Value that is the result of the event
* @param <T> Type of return value of the event
*/
public <T> T addAndGet(final CompletableApplicationEvent<T> event, final Timer timer) {
public <T> T addAndGet(final CompletableApplicationEvent<T> event) {
Objects.requireNonNull(event, "CompletableApplicationEvent provided to addAndGet must be non-null");
Objects.requireNonNull(timer, "Timer provided to addAndGet must be non-null");
add(event);
return ConsumerUtils.getResult(event.future(), timer);
return ConsumerUtils.getResult(event.future());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand All @@ -42,31 +41,20 @@
* An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread}
* which processes {@link ApplicationEvent application events} generated by the application thread.
*/
public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> {
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {

private final Logger log;
private final ConsumerMetadata metadata;
private final RequestManagers requestManagers;

public ApplicationEventProcessor(final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final RequestManagers requestManagers,
final ConsumerMetadata metadata) {
super(logContext, applicationEventQueue);
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
this.metadata = metadata;
}

/**
* Process the events—if any—that were produced by the application thread. It is possible that when processing
* an event generates an error. In such cases, the processor will log an exception, but we do not want those
* errors to be propagated to the caller.
*/
public boolean process() {
return process((event, error) -> error.ifPresent(e -> log.warn("Error processing event {}", e.getMessage(), e)));
}

@SuppressWarnings({"CyclomaticComplexity"})
@Override
public void process(ApplicationEvent event) {
Expand Down Expand Up @@ -273,7 +261,7 @@ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event
manager.consumerRebalanceListenerCallbackCompleted(event);
}

private void process(final CommitOnCloseEvent event) {
private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) {
if (!requestManagers.commitRequestManager.isPresent())
return;
log.debug("Signal CommitRequestManager closing");
Expand Down Expand Up @@ -309,15 +297,13 @@ private void process(final LeaveOnCloseEvent event) {
*/
public static Supplier<ApplicationEventProcessor> supplier(final LogContext logContext,
final ConsumerMetadata metadata,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final Supplier<RequestManagers> requestManagersSupplier) {
return new CachedSupplier<ApplicationEventProcessor>() {
@Override
protected ApplicationEventProcessor create() {
RequestManagers requestManagers = requestManagersSupplier.get();
return new ApplicationEventProcessor(
logContext,
applicationEventQueue,
requestManagers,
metadata
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Timer;

import java.util.Collections;
import java.util.Map;
Expand All @@ -30,11 +29,6 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
*/
private final Map<TopicPartition, OffsetAndMetadata> offsets;

protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final Timer timer) {
super(type, timer);
this.offsets = validate(offsets);
}

protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) {
super(type, deadlineMs);
this.offsets = validate(offsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.common.utils.Timer;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -32,13 +29,9 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
private final CompletableFuture<T> future;
private final long deadlineMs;

protected CompletableApplicationEvent(final Type type, final Timer timer) {
super(type);
this.future = new CompletableFuture<>();
Objects.requireNonNull(timer);
this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
}

/**
* <em>Note</em>: the {@code deadlineMs} is the future time of expiration, <em>not</em> a timeout.
*/
kirktrue marked this conversation as resolved.
Show resolved Hide resolved
protected CompletableApplicationEvent(final Type type, final long deadlineMs) {
super(type);
this.future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,29 @@
public abstract class CompletableBackgroundEvent<T> extends BackgroundEvent implements CompletableEvent<T> {

private final CompletableFuture<T> future;
private final long deadlineMs;

protected CompletableBackgroundEvent(final Type type) {
/**
* <em>Note</em>: the {@code deadlineMs} is the future time of expiration, <em>not</em> a timeout.
*/
protected CompletableBackgroundEvent(final Type type, final long deadlineMs) {
super(type);
this.future = new CompletableFuture<>();
this.deadlineMs = deadlineMs;
}

@Override
public CompletableFuture<T> future() {
return future;
}

@Override
public long deadlineMs() {
return deadlineMs;
}

@Override
protected String toStringBase() {
return super.toStringBase() + ", future=" + future;
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
}
}
Loading