-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17502: Modified commitSync() and close() handling in clients #17136
Conversation
2f92f15
to
bfa0e0a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few comments from an initial review.
} | ||
|
||
public void setAsyncRequest(V asyncRequest) { | ||
this.asyncRequest = asyncRequest; | ||
} | ||
|
||
public void setSyncRequest(V second) { | ||
this.syncRequest = second; | ||
public void setSyncRequestQueue(Queue<V> syncRequestQueue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could encapsulate this queue entirely in this class. For example, addSyncRequest
could create the queue if it's not yet been created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense, I have updated the addSyncRequest() method. Still the getSyncRequestQueue() method exists though as we need the queue contents when we process the acknowledgements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. It just seemed weird forcing the caller to make and set the queue.
private final long retryBackoffMs; | ||
private final long retryBackoffMaxMs; | ||
private boolean closing = false; | ||
private final CompletableFuture<Void> closeFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the handling of closeFuture
is flawed. An AcknowledgeRequestState
is node-specific, but the future is associated with the entire request manager. It's only valid to complete the future once all close acks have been completed, not just the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks, I had missed this. I am now put an extra variable to indicate if a requestState has completed processing, and then accordingly waited till all the nodes are finished and then completed the closeFuture
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Thanks for the PR.
…pache#17136) Currently the code in ShareConsumeRequestManager works on the basis that there can only be one commitSync()/close() at a time. But there is a chance these calls timeout on the application thread, but are still sent later on the background thread. This will mean the incoming commitSync()/close() will not be processed, resulting in possible loss of acknowledgements. To cover this case, we will now have a list of AcknowledgeRequestStates to store the commitSyncs() and a separate requestState to store the close(). This queue will be processed one by one until its empty. For close(), we are still assuming there can only be one active close() at a time. eviewers: Andrew Schofield <[email protected]>, Manikumar Reddy <[email protected]>
What
Currently the code in
ShareConsumeRequestManager
works on the basis that there can only be onecommitSync()
/close()
at a time. But there is a chance these calls timeout on the application thread, but are still sent later on the background thread. This will mean the incoming commitSync()/close() will not be processed, resulting in possible loss of acknowledgements.To cover this case, we will now have a list of
AcknowledgeRequestStates
to store thecommitSyncs()
and a separate requestState to store theclose()
. This queue will be processed one by one until its empty. Forclose()
, we are still assuming there can only be one activeclose()
at a time.