-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17439: Make polling for new records an explicit action/event in the new consumer #17035
Conversation
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I'm quite surprised to see how little code needed to change to make the fetching explicit. A couple of points.
First, I think it better not to overload the PollEvent because that's already used in the share consumer.
Second, it seems to me that there is still the potential for over-fetching, and this will still cause churn of the fetch session cache.
In the case where the consumer is only fetching a single partition, I think it works pretty well. The set of fetchable partitions will be empty if there's buffered data, and contain the only partition in the fetch session if there is not. So, you'll only send a Fetch request when there's a need for more data and the fetch session will not churn.
In the case where the consumer is fetching more than one partition on a particular node, if a subset of the partitions is fetchable, then the fetch session will be modified by sending a Fetch request and that seems to have the potential for a lot of churn.
Of course, all of this code is in common between the legacy consumer and the async consumer. The async consumer is still very keen on fetching so I don't properly grasp why this PR would make the fetch session behaviour better.
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Outdated
Show resolved
Hide resolved
Thanks for the review 👍
Agreed. I've introduced a
Agreed. It aims to lessen the churn. Preventing the churn completely is a future task.
Correct.
Correct again! Any partition with buffered data at the point where the fetch request is being generated will be marked as "removed" from the broker's fetch session cache. That's the crux of the problem 😞 Something that I tend to lose sight of is the fact that it's not a foregone conclusion that a fetch session will be evicted when it has partitions removed. Of course, it will increase its eligibility for eviction if the broker hosting the fetch session is resource-constrained and invokes the eviction process.
I'm not sure I follow. This code is all specific to the
Yep—the design of the Thanks! |
@AndrewJSchofield, et al.—it can be helpful to compare the flow of |
9527e8f
to
a8567c9
Compare
… the new consumer Updated the FetchRequestManager to only create and enqueue fetch requests when signaled to do so by a FetchEvent.
bb7efc1
to
e984638
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates and for the explanation of the mechanism.
I think it would be appropriate to test in FetchRequestManagerTest
the mechanism of the pending fetch request future in the various permutations.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchEvent.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @kirktrue , thanks for the updates, some comments...
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
…en continuously anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates!
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Outdated
Show resolved
Hide resolved
Allowing sendFetches to block in order to communicate errors, but it will suppress timeouts.
@lianetm—tests are passing and all comments have been addressed. Can you make another review pass? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @kirktrue! Took another pass and left some comments for consideration.
prepareFetchRequests(), | ||
this::handleFetchSuccess, | ||
this::handleFetchFailure | ||
); | ||
); | ||
pendingFetchRequestFuture.complete(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to complete this future also on pollOnClose
? there may be a pendingFetchRequestFuture
there that won't be completed (not that I'm seeing how leaving that future uncompleted on close will cause a problem but seems safer to complete it, consistently with how we do it here after pollInternal
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved the Future
-handling code to pollInternal()
for consistency. LMK what you think.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
Show resolved
Hide resolved
@@ -1520,6 +1523,9 @@ private Fetch<K, V> pollForFetches(Timer timer) { | |||
return fetch; | |||
} | |||
|
|||
// send any new fetches (won't resend pending fetches) | |||
sendFetches(timer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual poll now happens in here (addAndGet that will complete when the background has had one run, called fetchMgr.poll), so should the log line on ln 1538 "Polling for fetches with timeout..." be right before this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not polling for the incoming responses in sendFetches()
, just enqueuing the outgoing requests. This mimics the ClassicKafkaConsumer
in that the requests are enqueued in its sendFetches()
but then toward the bottom of pollForFetches()
client.poll()
is invoked to wait for the results of the fetch requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well the sendFetches
blocks until the CreateFetchRequestsEvent
completes, and that only happens on fetchMgr.poll
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java
Line 115 in aae7e97
pendingFetchRequestFuture.complete(null); |
So when the
sendFetches
completes we did poll the manager right? (and depending on time, maybe we did poll the client.poll too, which happens in the background right after polling all managers). That's why the log for "Polling for fetches" made sense to me before the sendFetches
, but am I missing another poll happening after the log line maybe? (where it is now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two ConsumerDelegate
implementations work differently:
AsyncKafkaConsumer
:FetchRequestManager.poll()
will complete the event'sFuture
on the background thread before it exits, i.e. before the thread starts the network I/O. Completing theFuture
starts the application thread racing toward logging that message and the background thread racing toward starting network I/O. I'll admit—I haven't dug through the code to surmise the relative costs of each thread's work before either cross their respective finish lines to win.ClassicKafkaConsumer
:Fetcher.sendFetchesInternal()
callsConsumerNetworkClient.send()
to enqueue the request, but then it callsNetworkClient.wakeup()
. Since the sameConsumerNetworkClient
instance used by the consumer is also used byAbstractCoordinator.HeartbeatThread
, it's technically possible that the heartbeat thread'srun()
method could start network I/O when it callsNetworkClient.pollNoWakeup()
. Granted, that's a race that the application thread is much more likely to win given that the heartbeat thread runs much less frequently.
Here are some points to consider:
- The definition of the term "poll" as used in the log is open to interpretation. The term "poll" is everywhere, making its meaning ambiguous at any given point of use 😢
- I agree there is a race condition (for both consumers, but more likely for the new consumer) that could result the the log message being emitted after the network I/O has commenced
- For this to pose a problem to users, there needs to be other log entries that we're racing with, right?. We're trying to avoid the condition where the user is confused/mislead because the entries in the log are emitted in non-deterministic ordering.
- The log line in question is only output at level
TRACE
, which I assume is very rare for users to enable.
Given the above, I'm of the opinion that it's an exercise in hair splitting to alter the logging. However, I could also just change it which would have been way less effort than researching, thinking, and composing this response 🤣
If we leave the log line as it is, what would the effect be for the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I surely didn't intend for you to put up that long response he he, sorry. It's not about the log line per-se, it's about the alignment on where the poll happens. The classic consumer logs "Polling for records", then calls client.poll
. vs us here we do sendFetches
(which triggers the client.poll async in the background thread because it blocks until we poll the fetch manager), then log "Polling for fetches...".
That's the diff I saw and just wanted to understand/align on where the poll happens: once we trigger sendFetches
(blocking), the client.poll will happen in the background anytime, not controlled by the app thread. Agreed? If so I'm ok with leaving the log unchanged, understanding it could come out after the client.poll happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the diff I saw and just wanted to understand/align on where the poll happens: once we trigger sendFetches (blocking), the client.poll will happen in the background anytime, not controlled by the app thread. Agreed?
Agreed—the background thread is going to move from calling each of the RequestManager
’s poll()
method to NetworkClient.poll()
method without the intervention of the application thread.
If so I'm ok with leaving the log unchanged, understanding it could come out after the client.poll happened.
Thanks!
@@ -707,6 +708,8 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { | |||
updateAssignmentMetadataIfNeeded(timer); | |||
final Fetch<K, V> fetch = pollForFetches(timer); | |||
if (!fetch.isEmpty()) { | |||
sendFetches(timer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at this point we may already have records in hand to return (consumed position updated), so we should be very careful to not throw any error here. But this sendFetches
could throw interrupted because of the addAndGet
right?
Shouldn't we just do a best effort to pipeline the next requests using add
instead of addAndGet
? It would achieve what we want, removing the risk of errors, and it would actually align better with what the classic does on this sendFetches
+ transmitSends
:
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
Lines 327 to 329 in 140d35c
* Poll for network IO in best-effort only trying to transmit the ready-to-send request | |
* Do not check any pending requests or metadata errors so that no exception should ever | |
* be thrown, also no wakeups be triggered and no interrupted exception either. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @kirktrue! Just one nit left, almost there.
* | ||
* <ul> | ||
* <li> | ||
* The method will wait for confirmation of the request creation before continuing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not true now for prefetching that uses .add
instead of .addAndGet
, should we remove this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Reworded to state that it will not wait for confirmation.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates @kirktrue! LGTM.
… the new consumer (apache#17035) Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans <[email protected]>
… the new consumer (apache#17035) Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans <[email protected]>
… the new consumer (apache#17035) Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans <[email protected]>
Updated the
FetchRequestManager
to only create and enqueue fetch requests when signaled to do so by aFetchEvent
.The application thread and the background thread each contains logic that is performed if there is buffered data from a previous fetch. There's a race condition because the presence of buffered data could change between the two threads’ respective checks. Right now the window for the race condition to occur is wide open; this change aims to make the window ajar.
In the
ClassicKafkaConsumer
, the application thread will explicitly issue fetch requests (via theFetcher
class) at specific points in theConsumer.poll()
cycle. Prior to this change, theAsyncKafkaConsumer
would issue fetch requests independently from the user callingConsumer.poll()
; the fetches would happen nearly continuously as soon as any assigned partition was fetchable. With this change, theAsyncKafkaConsumer
introduces aFetchEvent
that signals to the background thread that a fetch request should be issued. The specific points where this is done in theConsumer.poll()
cycle of theAsyncKafkaConsumer
now match theClassicKafkaConsumer
. In short: this makesAsyncKafkaConsumer
act nearly identical to theClassicKafkaConsumer
in this regard.As mentioned above, this change does not completely solve the problem related to fetch session eviction. Exactly how the window where the race condition can be shut completely is outside the scope of this change.
See KAFKA-17182.
Committer Checklist (excluded from commit message)