From 057be1b147c8cfd8e49a2cdd9a0101be5da8dd13 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Mon, 28 Oct 2024 12:46:37 -0700 Subject: [PATCH] KAFKA-17439: Make polling for new records an explicit action/event in the new consumer (#17035) Reviewers: Andrew Schofield , Lianet Magrans --- .../internals/AsyncKafkaConsumer.java | 69 +++++++++++ .../internals/FetchRequestManager.java | 108 ++++++++++++++---- .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 9 ++ .../events/CreateFetchRequestsEvent.java | 33 ++++++ .../clients/consumer/KafkaConsumerTest.java | 5 - .../internals/AsyncKafkaConsumerTest.java | 2 + .../internals/FetchRequestManagerTest.java | 104 ++++++++++++++++- .../events/ApplicationEventProcessorTest.java | 2 + 9 files changed, 303 insertions(+), 31 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index b606df2dce072..dd652e3235e56 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -50,6 +50,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; +import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; @@ -708,6 +709,14 @@ public ConsumerRecords poll(final Duration timeout) { updateAssignmentMetadataIfNeeded(timer); final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { + // before returning the fetched records, we can send off the next round of fetches + // and avoid block waiting for their responses to enable pipelining while the user + // is handling the fetched records. + // + // NOTE: since the consumed position has already been updated, we must not allow + // wakeups or any other errors to be triggered prior to returning the fetched records. + sendPrefetches(timer); + if (fetch.records().isEmpty()) { log.trace("Returning empty records from `poll()` " + "since the consumer's position has advanced for at least one topic partition"); @@ -1519,6 +1528,9 @@ private Fetch pollForFetches(Timer timer) { return fetch; } + // send any new fetches (won't resend pending fetches) + sendFetches(timer); + // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure @@ -1606,6 +1618,63 @@ private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAnd offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); } + /** + * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}. + * + *

+ * + * This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method + * of the same name: + * + *

    + *
  • + * The method will wait for confirmation of the request creation before continuing. + *
  • + *
  • + * The method will throw exceptions encountered during request creation to the user immediately. + *
  • + *
  • + * The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation. + * Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms. + * That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}. + * Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests} + * as it can handle requests that are created after the timeout. + *
  • + *
+ * + * @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice + * is used to avoid using {@link Long#MAX_VALUE} to wait "forever" + */ + private void sendFetches(Timer timer) { + try { + applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer))); + } catch (TimeoutException e) { + // Can be ignored, per above comments. + } + } + + /** + * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the + * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread + * will not wait for confirmation of the request creation before continuing. + * + *

+ * + * At the point this method is called, {@link KafkaConsumer#poll(Duration)} has data ready to return to the user, + * which means the consumed position was already updated. In order to prevent potential gaps in records, this + * method is designed to suppress all exceptions. + * + * @param timer Provides an upper bound for the event and its {@link CompletableFuture future} + */ + private void sendPrefetches(Timer timer) { + try { + applicationEventHandler.add(new CreateFetchRequestsEvent(calculateDeadlineMs(timer))); + } catch (Throwable t) { + // Any unexpected errors will be logged for troubleshooting, but not thrown. + log.warn("An unexpected error occurred while pre-fetching data in Consumer.poll(), but was suppressed", t); + } + } + @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { maybeThrowFencedInstanceException(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java index 3cef94e05f8e1..c52b5453e21d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java @@ -19,8 +19,10 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent; import org.apache.kafka.common.Node; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.LogContext; @@ -29,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -41,6 +44,7 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager { private final NetworkClientDelegate networkClientDelegate; + private CompletableFuture pendingFetchRequestFuture; FetchRequestManager(final LogContext logContext, final Time time, @@ -65,15 +69,42 @@ protected void maybeThrowAuthFailure(Node node) { networkClientDelegate.maybeThrowAuthFailure(node); } + /** + * Signals the {@link Consumer} wants requests be created for the broker nodes to fetch the next + * batch of records. + * + * @see CreateFetchRequestsEvent + * @return Future on which the caller can wait to ensure that the requests have been created + */ + public CompletableFuture createFetchRequests() { + CompletableFuture future = new CompletableFuture<>(); + + if (pendingFetchRequestFuture != null) { + // In this case, we have an outstanding fetch request, so chain the newly created future to be + // completed when the "pending" future is completed. + pendingFetchRequestFuture.whenComplete((value, exception) -> { + if (exception != null) { + future.completeExceptionally(exception); + } else { + future.complete(value); + } + }); + } else { + pendingFetchRequestFuture = future; + } + + return future; + } + /** * {@inheritDoc} */ @Override public PollResult poll(long currentTimeMs) { return pollInternal( - prepareFetchRequests(), - this::handleFetchSuccess, - this::handleFetchFailure + this::prepareFetchRequests, + this::handleFetchSuccess, + this::handleFetchFailure ); } @@ -82,9 +113,12 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose(long currentTimeMs) { + // There needs to be a pending fetch request for pollInternal to create the requests. + createFetchRequests(); + // TODO: move the logic to poll to handle signal close return pollInternal( - prepareCloseFetchSessionRequests(), + this::prepareCloseFetchSessionRequests, this::handleCloseFetchSessionSuccess, this::handleCloseFetchSessionFailure ); @@ -94,28 +128,56 @@ public PollResult pollOnClose(long currentTimeMs) { * Creates the {@link PollResult poll result} that contains a list of zero or more * {@link FetchRequest.Builder fetch requests}. * - * @param fetchRequests {@link Map} of {@link Node nodes} to their {@link FetchSessionHandler.FetchRequestData} - * @param successHandler {@link ResponseHandler Handler for successful responses} - * @param errorHandler {@link ResponseHandler Handler for failure responses} + * @param fetchRequestPreparer {@link FetchRequestPreparer} to generate a {@link Map} of {@link Node nodes} + * to their {@link FetchSessionHandler.FetchRequestData} + * @param successHandler {@link ResponseHandler Handler for successful responses} + * @param errorHandler {@link ResponseHandler Handler for failure responses} * @return {@link PollResult} */ - private PollResult pollInternal(Map fetchRequests, + private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer, ResponseHandler successHandler, ResponseHandler errorHandler) { - List requests = fetchRequests.entrySet().stream().map(entry -> { - final Node fetchTarget = entry.getKey(); - final FetchSessionHandler.FetchRequestData data = entry.getValue(); - final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); - final BiConsumer responseHandler = (clientResponse, error) -> { - if (error != null) - errorHandler.handle(fetchTarget, data, error); - else - successHandler.handle(fetchTarget, data, clientResponse); - }; - - return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler); - }).collect(Collectors.toList()); - - return new PollResult(requests); + if (pendingFetchRequestFuture == null) { + // If no explicit request for creating fetch requests was issued, just short-circuit. + return PollResult.EMPTY; + } + + try { + Map fetchRequests = fetchRequestPreparer.prepare(); + + List requests = fetchRequests.entrySet().stream().map(entry -> { + final Node fetchTarget = entry.getKey(); + final FetchSessionHandler.FetchRequestData data = entry.getValue(); + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + final BiConsumer responseHandler = (clientResponse, error) -> { + if (error != null) + errorHandler.handle(fetchTarget, data, error); + else + successHandler.handle(fetchTarget, data, clientResponse); + }; + + return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler); + }).collect(Collectors.toList()); + + pendingFetchRequestFuture.complete(null); + return new PollResult(requests); + } catch (Throwable t) { + // A "dummy" poll result is returned here rather than rethrowing the error because any error + // that is thrown from any RequestManager.poll() method interrupts the polling of the other + // request managers. + pendingFetchRequestFuture.completeExceptionally(t); + return PollResult.EMPTY; + } finally { + pendingFetchRequestFuture = null; + } + } + + /** + * Simple functional interface to all passing in a method reference for improved readability. + */ + @FunctionalInterface + protected interface FetchRequestPreparer { + + Map prepare(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index e2ee28f5702f5..0d258cda2b4e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -32,7 +32,7 @@ public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, - COMMIT_ON_CLOSE, + COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_ACKNOWLEDGE_ON_CLOSE, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 6f6a1714bed4c..2f6ca35feafb3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -124,6 +124,10 @@ public void process(ApplicationEvent event) { process((CommitOnCloseEvent) event); return; + case CREATE_FETCH_REQUESTS: + process((CreateFetchRequestsEvent) event); + return; + case SHARE_FETCH: process((ShareFetchEvent) event); return; @@ -176,6 +180,11 @@ private void process(final PollEvent event) { } } + private void process(final CreateFetchRequestsEvent event) { + CompletableFuture future = requestManagers.fetchRequestManager.createFetchRequests(); + future.whenComplete(complete(event.future())); + } + private void process(final AsyncCommitEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { return; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java new file mode 100644 index 0000000000000..056cd4811abe2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CreateFetchRequestsEvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.internals.FetchRequestManager; + +/** + * {@code CreateFetchRequestsEvent} signals that the {@link Consumer} wants to issue fetch requests to the nodes + * for the partitions to which the consumer is currently subscribed. The event is completed when the + * {@link FetchRequestManager} has finished creating (i.e. not enqueuing, sending, or receiving) + * fetch requests (if any) to send to the broker nodes. + */ +public class CreateFetchRequestsEvent extends CompletableApplicationEvent { + + public CreateFetchRequestsEvent(final long deadlineMs) { + super(Type.CREATE_FETCH_REQUESTS, deadlineMs); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 9139fa16ab088..c260fa48c019b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2566,11 +2566,6 @@ public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) consumer.assign(singleton(tp0)); consumer.seek(tp0, 50L); - // For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in background thread. - // Wait for the first fetch request to avoid ListOffsetResponse mismatch. - TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH), - "No fetch request sent"); - client.prepareResponse(request -> request instanceof ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L))); assertEquals(singletonMap(tp0, 90L), consumer.endOffsets(Collections.singleton(tp0))); // correct lag result should be returned as well diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index f03cde308f67a..7f78f6e8bba18 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; +import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; @@ -1711,6 +1712,7 @@ public void testEnsurePollEventSentOnConsumerPoll() { consumer.subscribe(singletonList("topic1")); consumer.poll(Duration.ofMillis(100)); verify(applicationEventHandler).add(any(PollEvent.class)); + verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class)); } private Properties requiredConsumerConfigAndGroupId(final String groupId) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index c79077f11a37c..3e3f70a7443f0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; @@ -120,6 +121,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -134,8 +136,10 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.apache.kafka.test.TestUtils.assertOptional; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -237,8 +241,12 @@ public void teardown() throws Exception { } private int sendFetches() { + return sendFetches(true); + } + + private int sendFetches(boolean requestFetch) { offsetFetcher.validatePositionsOnMetadataChange(); - return fetcher.sendFetches(); + return fetcher.sendFetches(requestFetch); } @Test @@ -3386,6 +3394,71 @@ public void testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform assertTrue(subscriptions.isFetchable(tp1)); } + @Test + public void testPollWithoutCreateFetchRequests() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + assertEquals(0, sendFetches(false)); + } + + @Test + public void testPollWithCreateFetchRequests() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + CompletableFuture future = fetcher.createFetchRequests(); + assertNotNull(future); + assertFalse(future.isDone()); + + assertEquals(1, sendFetches(false)); + assertTrue(future.isDone()); + + assertEquals(0, sendFetches(false)); + } + + @Test + public void testPollWithCreateFetchRequestsError() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + fetcher.setAuthenticationException(new AuthenticationException("Intentional error")); + CompletableFuture future = fetcher.createFetchRequests(); + assertNotNull(future); + assertFalse(future.isDone()); + + assertDoesNotThrow(() -> sendFetches(false)); + assertFutureThrows(future, AuthenticationException.class); + } + + @Test + public void testPollWithRedundantCreateFetchRequests() { + buildFetcher(); + + assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + List> futures = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + CompletableFuture future = fetcher.createFetchRequests(); + assertNotNull(future); + futures.add(future); + } + + assertEquals(0, futures.stream().filter(CompletableFuture::isDone).count()); + + assertEquals(1, sendFetches(false)); + assertEquals(futures.size(), futures.stream().filter(CompletableFuture::isDone).count()); + + } + private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse( TopicPartition topicPartition, Errors error, @@ -3639,6 +3712,7 @@ private List collectRecordOffsets(List> records) private class TestableFetchRequestManager extends FetchRequestManager { private final FetchCollector fetchCollector; + private AuthenticationException authenticationException; public TestableFetchRequestManager(LogContext logContext, Time time, @@ -3654,11 +3728,37 @@ public TestableFetchRequestManager(LogContext logContext, this.fetchCollector = fetchCollector; } + public void setAuthenticationException(AuthenticationException authenticationException) { + this.authenticationException = authenticationException; + } + + @Override + protected boolean isUnavailable(Node node) { + if (authenticationException != null) + return true; + + return super.isUnavailable(node); + } + + @Override + protected void maybeThrowAuthFailure(Node node) { + if (authenticationException != null) { + AuthenticationException e = authenticationException; + authenticationException = null; + throw e; + } + + super.maybeThrowAuthFailure(node); + } + private Fetch collectFetch() { return fetchCollector.collectFetch(fetchBuffer); } - private int sendFetches() { + private int sendFetches(boolean requestFetch) { + if (requestFetch) + createFetchRequests(); + NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds()); networkClientDelegate.addAll(pollResult.unsentRequests); return pollResult.unsentRequests.size(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index e60a0a7005980..27f3ae4600257 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -128,6 +128,8 @@ public void testApplicationEventIsProcessed(ApplicationEvent e) { private static Stream applicationEvents() { return Stream.of( + Arguments.of(new PollEvent(100)), + Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))), Arguments.of(new AsyncCommitEvent(new HashMap<>())), Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)), Arguments.of(new CheckAndUpdatePositionsEvent(500)),