-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. #16156
Changes from 3 commits
4f86e67
260b88d
1ca6f46
5729ecd
8bc8670
13d038e
a4648f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -293,12 +293,10 @@ private void closeInternal(final Duration timeout) { | |
* Check the unsent queue one last time and poll until all requests are sent or the timer runs out. | ||
*/ | ||
private void sendUnsentRequests(final Timer timer) { | ||
if (networkClientDelegate.unsentRequests().isEmpty()) | ||
return; | ||
do { | ||
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); | ||
timer.update(); | ||
} while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()); | ||
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use |
||
} | ||
|
||
void cleanup() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,6 +43,7 @@ | |
import static org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG; | ||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.junit.jupiter.api.Assertions.assertNull; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
@@ -140,6 +141,35 @@ public void testEnsureTimerSetOnAdd() { | |
assertEquals(REQUEST_TIMEOUT_MS, ncd.unsentRequests().poll().timer().timeoutMs()); | ||
} | ||
|
||
@Test | ||
public void testHasAnyPendingRequests() throws Exception { | ||
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) { | ||
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); | ||
networkClientDelegate.add(unsentRequest); | ||
|
||
// unsent | ||
assertTrue(networkClientDelegate.hasAnyPendingRequests()); | ||
assertFalse(networkClientDelegate.unsentRequests().isEmpty()); | ||
assertFalse(client.hasInFlightRequests()); | ||
|
||
networkClientDelegate.poll(0, time.milliseconds()); | ||
|
||
// in-flight | ||
assertTrue(networkClientDelegate.hasAnyPendingRequests()); | ||
assertTrue(networkClientDelegate.unsentRequests().isEmpty()); | ||
assertTrue(client.hasInFlightRequests()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to cover the whole flow, what about extending the test to ensure that when we get a response, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes sense, I will take a look 😺 |
||
time.sleep(REQUEST_TIMEOUT_MS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we mock response instead of making timeout? For example: // in-flight
assertTrue(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertTrue(client.hasInFlightRequests());
// complete request
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, mockNode()));
networkClientDelegate.poll(0, time.milliseconds());
assertFalse(networkClientDelegate.hasAnyPendingRequests());
assertTrue(networkClientDelegate.unsentRequests().isEmpty());
assertFalse(client.hasInFlightRequests()); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool, this works like a charm and is more reasonable. 🥳 |
||
|
||
// get response normally | ||
networkClientDelegate.add(unsentRequest); | ||
prepareFindCoordinatorResponse(Errors.NONE); | ||
networkClientDelegate.poll(0, time.milliseconds()); | ||
assertFalse(networkClientDelegate.hasAnyPendingRequests()); | ||
assertTrue(networkClientDelegate.unsentRequests().isEmpty()); | ||
assertFalse(client.hasInFlightRequests()); | ||
} | ||
} | ||
|
||
public NetworkClientDelegate newNetworkClientDelegate() { | ||
LogContext logContext = new LogContext(); | ||
Properties properties = new Properties(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the early return?
If there is a request in in-flight but unSentRequestQueue is empty, we will lose the newest request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I think we don't need this early return statement in the current situation because we also need to consider if there are any in-flight requests.
The reason why we have this statement here is because we didn't consider the in-flight requests before, so we could return if there are no unsent requests. 🤔