Skip to content

Commit

Permalink
KAFKA-15305 The background thread should try to process the remaining…
Browse files Browse the repository at this point in the history
… task until the shutdown timer is expired. (#16156)

Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
frankvicky authored and chia7712 committed Jun 4, 2024
1 parent ec278a6 commit 0e94509
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ private void closeInternal(final Duration timeout) {
* Check the unsent queue one last time and poll until all requests are sent or the timer runs out.
*/
private void sendUnsentRequests(final Timer timer) {
if (networkClientDelegate.unsentRequests().isEmpty())
if (!networkClientDelegate.hasAnyPendingRequests())
return;

do {
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty());
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());
}

void cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
checkDisconnects(currentTimeMs);
}

/**
* Return true if there is at least one in-flight request or unsent request.
*/
public boolean hasAnyPendingRequests() {
return client.hasInFlightRequests() || !unsentRequests.isEmpty();
}

/**
* Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will
* find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,27 @@ void testRunOnceInvokesReaper() {
verify(applicationEventReaper).reap(any(Long.class));
}

@Test
void testSendUnsentRequest() {
String groupId = "group-id";
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
.setKey(groupId)),
Optional.empty());

networkClient.add(request);
assertTrue(networkClient.hasAnyPendingRequests());
assertFalse(networkClient.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
consumerNetworkThread.cleanup();

assertTrue(networkClient.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
assertFalse(networkClient.hasAnyPendingRequests());
}

private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets,
final Errors error,
final boolean disconnected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -140,6 +141,34 @@ public void testEnsureTimerSetOnAdd() {
assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs());
}

@Test
public void testHasAnyPendingRequests() throws Exception {
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
networkClientDelegate.add(unsentRequest);

// unsent
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertFalse(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());

networkClientDelegate.poll(0, time.milliseconds());

// in-flight
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertTrue(client.hasInFlightRequests());

client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode()));
networkClientDelegate.poll(0, time.milliseconds());

// get response
assertFalse(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
}
}

public NetworkClientDelegate newNetworkClientDelegate() {
LogContext logContext = new LogContext();
Properties properties = new Properties();
Expand Down

0 comments on commit 0e94509

Please sign in to comment.