Skip to content

Commit

Permalink
KAFKA-17711: Minor cleanup changes in ShareConsumeRequestManager (#17392
Browse files Browse the repository at this point in the history
)

What
Minor cleanup and javadoc changes in ShareConsumeRequestManager.

Reviewers:  Andrew Schofield <[email protected]>,  Manikumar Reddy <[email protected]>
  • Loading branch information
ShivsundarR authored Oct 7, 2024
1 parent 3f2cb55 commit 8637b6a
Showing 1 changed file with 39 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
fetchMoreRecords = true;
}

// The acknowledgements sent via ShareFetch are stored in this map.
acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge));
}

Expand All @@ -220,38 +221,38 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
*/
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
AtomicBoolean isAsyncDone = new AtomicBoolean();
AtomicBoolean isAsyncSent = new AtomicBoolean();
for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
int nodeId = requestStates.getKey();

if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else {
isAsyncDone.set(false);
// For commitAsync
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
isAsyncSent.set(false);
// First, the acknowledgements from commitAsync is sent.
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncSent).ifPresent(unsentRequests::add);

// Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process.
if (isAsyncDone.get()) {
if (isAsyncSent.get()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
continue;
}

// We try to process the close request only if we have processed the async and the sync requests for the node.
if (requestStates.getValue().getSyncRequestQueue() == null) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else {
AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest();
AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest();

maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncSent).ifPresent(unsentRequests::add);
} else {
// Processing the acknowledgements from commitSync
for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
break;
}
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncSent).ifPresent(unsentRequests::add);
}
}
}
}

}

PollResult pollResult = null;
Expand Down Expand Up @@ -284,11 +285,20 @@ private void maybeSendShareAcknowledgeCommitCallbackEvent(Map<TopicIdPartition,
}
}

/**
*
* @param acknowledgeRequestState Contains the acknowledgements to be sent.
* @param currentTimeMs The current time in ms.
* @param onCommitAsync Boolean to denote if the acknowledgements came from a commitAsync or not.
* @param isAsyncSent Boolean to indicate if the async request has been sent.
*
* @return Returns the request if it was built.
*/
private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState,
long currentTimeMs,
boolean onCommitAsync,
AtomicBoolean isAsyncDone) {
boolean asyncDone = true;
AtomicBoolean isAsyncSent) {
boolean asyncSent = true;
try {
if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
return Optional.empty();
Expand All @@ -306,28 +316,29 @@ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknow

if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
// We wait for the backoff before we can send this request.
asyncDone = false;
asyncSent = false;
return Optional.empty();
}

UnsentRequest request = acknowledgeRequestState.buildRequest();
if (request == null) {
asyncDone = false;
asyncSent = false;
return Optional.empty();
}

acknowledgeRequestState.onSendAttempt(currentTimeMs);
return Optional.of(request);
} finally {
if (onCommitAsync) {
isAsyncDone.set(asyncDone);
isAsyncSent.set(asyncSent);
}
}
}

/**
* Prunes the empty acknowledgementRequestStates.
* Returns true if there are still any acknowledgements left to be processed.
* Prunes the empty acknowledgementRequestStates in {@link #acknowledgeRequestStates}
*
* @return Returns true if there are still any acknowledgements left to be processed.
*/
private boolean checkAndRemoveCompletedAcknowledgements() {
boolean areAnyAcksLeft = false;
Expand All @@ -340,10 +351,12 @@ private boolean checkAndRemoveCompletedAcknowledgements() {
acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
areAsyncAcksLeft = false;
}

if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) {
acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue();
areSyncAcksLeft = false;
}

if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest())) {
acknowledgeRequestStatePair.getValue().setCloseRequest(null);
}
Expand Down Expand Up @@ -879,8 +892,10 @@ public class AcknowledgeRequestState extends TimedRequestState {
private final AcknowledgeRequestType requestType;

/**
* Boolean to indicate if the request has been processed,
* Boolean to indicate if the request has been processed.
* <p>
* Set to true once we process the response and do not retry the request.
* <p>
* Initialized to false every time we build a request.
*/
private boolean isProcessed;
Expand Down

0 comments on commit 8637b6a

Please sign in to comment.