diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index adee6594603bb..7616ac6912289 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -293,12 +293,13 @@ private void closeInternal(final Duration timeout) { * Check the unsent queue one last time and poll until all requests are sent or the timer runs out. */ private void sendUnsentRequests(final Timer timer) { - if (networkClientDelegate.unsentRequests().isEmpty()) + if (!networkClientDelegate.hasAnyPendingRequests()) return; + do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); - } while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()); + } while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()); } void cleanup() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 141f5f955c8b5..2cd6f6d853049 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -130,6 +130,13 @@ public void poll(final long timeoutMs, final long currentTimeMs) { checkDisconnects(currentTimeMs); } + /** + * Return true if there is at least one in-flight request or unsent request. + */ + public boolean hasAnyPendingRequests() { + return client.hasInFlightRequests() || !unsentRequests.isEmpty(); + } + /** * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 8c3f97dd64379..b6780fd42736f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -339,6 +339,27 @@ void testRunOnceInvokesReaper() { verify(applicationEventReaper).reap(any(Long.class)); } + @Test + void testSendUnsentRequest() { + String groupId = "group-id"; + NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( + new FindCoordinatorRequest.Builder( + new FindCoordinatorRequestData() + .setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()) + .setKey(groupId)), + Optional.empty()); + + networkClient.add(request); + assertTrue(networkClient.hasAnyPendingRequests()); + assertFalse(networkClient.unsentRequests().isEmpty()); + assertFalse(client.hasInFlightRequests()); + consumerNetworkThread.cleanup(); + + assertTrue(networkClient.unsentRequests().isEmpty()); + assertFalse(client.hasInFlightRequests()); + assertFalse(networkClient.hasAnyPendingRequests()); + } + private void prepareOffsetCommitRequest(final Map expectedOffsets, final Errors error, final boolean disconnected) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 70f33bfdf451e..7d53ca385b8e3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -43,6 +43,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -140,6 +141,34 @@ public void testEnsureTimerSetOnAdd() { assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs()); } + @Test + public void testHasAnyPendingRequests() throws Exception { + try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) { + NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); + networkClientDelegate.add(unsentRequest); + + // unsent + assertTrue(networkClientDelegate.hasAnyPendingRequests()); + assertFalse(networkClientDelegate.unsentRequests().isEmpty()); + assertFalse(client.hasInFlightRequests()); + + networkClientDelegate.poll(0, time.milliseconds()); + + // in-flight + assertTrue(networkClientDelegate.hasAnyPendingRequests()); + assertTrue(networkClientDelegate.unsentRequests().isEmpty()); + assertTrue(client.hasInFlightRequests()); + + client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode())); + networkClientDelegate.poll(0, time.milliseconds()); + + // get response + assertFalse(networkClientDelegate.hasAnyPendingRequests()); + assertTrue(networkClientDelegate.unsentRequests().isEmpty()); + assertFalse(client.hasInFlightRequests()); + } + } + public NetworkClientDelegate newNetworkClientDelegate() { LogContext logContext = new LogContext(); Properties properties = new Properties();