Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. #16156

Merged
merged 7 commits into from
Jun 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,10 @@ private void closeInternal(final Duration timeout) {
* Check the unsent queue one last time and poll until all requests are sent or the timer runs out.
*/
private void sendUnsentRequests(final Timer timer) {
if (networkClientDelegate.unsentRequests().isEmpty())
return;
do {
while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()) {
Copy link
Member

@lianetm lianetm Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry a bit late but one concern: wouldn't this change make that requests that are issued with timeout 0 won't be sent out??? My understanding is that the expected behaviour for requests with timeout 0 is fire-and-forget, so we want to send them at least once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm you are right. Maybe we can keep origin code but we replace all networkClientDelegate.unsentRequests().isEmpty() by !networkClientDelegate.hasAnyPendingRequests()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, sounds good to me: keeping the early return if to avoid poll, do-while to ensure we send out at least once, both conditions using hasAnyPendingRequests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm, thank you for pointing out this potential bug, I will fix it. 🙇🏼

networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty());
}
}

void cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ public void poll(final long timeoutMs, final long currentTimeMs) {
checkDisconnects(currentTimeMs);
}

/**
* Return true if there is at least one in-flight request or unsent request.
*/
public boolean hasAnyPendingRequests() {
return client.hasInFlightRequests() || !unsentRequests.isEmpty();
}

/**
* Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will
* find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,27 @@ void testRunOnceInvokesReaper() {
verify(applicationEventReaper).reap(any(Long.class));
}

@Test
void testSendUnsentRequest() {
String groupId = "group-id";
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
.setKey(groupId)),
Optional.empty());

networkClient.add(request);
assertTrue(networkClient.hasAnyPendingRequests());
assertFalse(networkClient.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
consumerNetworkThread.cleanup();

assertTrue(networkClient.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
assertFalse(networkClient.hasAnyPendingRequests());
}

private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets,
final Errors error,
final boolean disconnected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -140,6 +141,34 @@ public void testEnsureTimerSetOnAdd() {
assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs());
}

@Test
public void testHasAnyPendingRequests() throws Exception {
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
networkClientDelegate.add(unsentRequest);

// unsent
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertFalse(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());

networkClientDelegate.poll(0, time.milliseconds());

// in-flight
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertTrue(client.hasInFlightRequests());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to cover the whole flow, what about extending the test to ensure that when we get a response, the hasAnyPendingRequests goes false? (there's a prepareFindCoordinatorResponse that should be handy for it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I will take a look 😺


client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode()));
networkClientDelegate.poll(0, time.milliseconds());

// get response
assertFalse(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests());
}
}

public NetworkClientDelegate newNetworkClientDelegate() {
LogContext logContext = new LogContext();
Properties properties = new Properties();
Expand Down