Skip to content

Commit

Permalink
KAFKA-17502: Modified commitSync() and close() handling in clients (a…
Browse files Browse the repository at this point in the history
…pache#17136)

Currently the code in ShareConsumeRequestManager works on the basis that there can only be one commitSync()/close() at a time. But there is a chance these calls timeout on the application thread, but are still sent later on the background thread. This will mean the incoming commitSync()/close() will not be processed, resulting in possible loss of acknowledgements.

To cover this case, we will now have a list of AcknowledgeRequestStates to store the commitSyncs() and a separate requestState to store the close(). This queue will be processed one by one until its empty. For close(), we are still assuming there can only be one active close() at a time.

eviewers:  Andrew Schofield <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
ShivsundarR authored and tedyu committed Jan 6, 2025
1 parent 441fcf1 commit a997ac5
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -84,7 +86,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final Map<TopicIdPartition, Acknowledgements> fetchAcknowledgementsMap;
private final Map<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStates;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private boolean closing = false;
Expand Down Expand Up @@ -132,7 +134,7 @@ public PollResult poll(long currentTimeMs) {
return pollResult;
}

if (!fetchMoreRecords || closing) {
if (!fetchMoreRecords) {
return PollResult.EMPTY;
}

Expand Down Expand Up @@ -216,7 +218,7 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> acknowledgementsMap) {
private PollResult processAcknowledgements(long currentTimeMs) {
List<UnsentRequest> unsentRequests = new ArrayList<>();
AtomicBoolean isAsyncDone = new AtomicBoolean();
for (Map.Entry<Integer, Pair<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
for (Map.Entry<Integer, Tuple<AcknowledgeRequestState>> requestStates : acknowledgeRequestStates.entrySet()) {
int nodeId = requestStates.getKey();

if (!isNodeFree(nodeId)) {
Expand All @@ -226,10 +228,25 @@ private PollResult processAcknowledgements(long currentTimeMs) {
// For commitAsync
maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add);
// Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process.
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else if (isAsyncDone.get()) {
maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
if (isAsyncDone.get()) {
// We try to process the close request only if we have processed the async and the sync requests for the node.
if (requestStates.getValue().getSyncRequestQueue() == null) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
} else {
AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest();

maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
} else {
for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) {
if (!isNodeFree(nodeId)) {
log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId);
break;
}
maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add);
}
}
}
}
}
Expand All @@ -242,12 +259,10 @@ private PollResult processAcknowledgements(long currentTimeMs) {
pollResult = PollResult.EMPTY;
} else if (closing) {
if (!closeFuture.isDone()) {
log.trace("Completing acknowledgement on close");
closeFuture.complete(null);
}
pollResult = PollResult.EMPTY;
}

return pollResult;
}

Expand Down Expand Up @@ -298,25 +313,56 @@ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknow

/**
* Prunes the empty acknowledgementRequestStates.
* Returns true if there are still some acknowledgements left to be processed.
* Returns true if there are still any acknowledgements left to be processed.
*/
private boolean checkAndRemoveCompletedAcknowledgements() {
boolean areAnyAcksLeft = false;
Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();
Iterator<Map.Entry<Integer, Tuple<AcknowledgeRequestState>>> iterator = acknowledgeRequestStates.entrySet().iterator();

while (iterator.hasNext()) {
Map.Entry<Integer, Pair<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
if (isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest()) || isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest())) {
Map.Entry<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStatePair = iterator.next();
boolean areAsyncAcksLeft = true, areSyncAcksLeft = true;
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())) {
acknowledgeRequestStatePair.getValue().setAsyncRequest(null);
areAsyncAcksLeft = false;
}
if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) {
acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue();
areSyncAcksLeft = false;
}
if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest())) {
acknowledgeRequestStatePair.getValue().setCloseRequest(null);
}

if (areAsyncAcksLeft || areSyncAcksLeft) {
areAnyAcksLeft = true;
} else if (!closing) {
} else if (acknowledgeRequestStatePair.getValue().getCloseRequest() == null) {
iterator.remove();
}
}

if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true;
return areAnyAcksLeft;
}

private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequestState) {
return acknowledgeRequestState != null && !(acknowledgeRequestState.isEmpty());
if (acknowledgeRequestState == null) {
return false;
} else if (acknowledgeRequestState.onClose()) {
return !acknowledgeRequestState.isProcessed;
} else {
return !(acknowledgeRequestState.isEmpty());
}
}

private boolean areRequestStatesInProgress(Queue<AcknowledgeRequestState> acknowledgeRequestStates) {
if (acknowledgeRequestStates == null) return false;
for (AcknowledgeRequestState acknowledgeRequestState : acknowledgeRequestStates) {
if (isRequestStateInProgress(acknowledgeRequestState)) {
return true;
}
}
return false;
}

/**
Expand All @@ -340,42 +386,34 @@ public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> commitSync(
sessionHandlers.forEach((nodeId, sessionHandler) -> {
Node node = cluster.nodeById(nodeId);
if (node != null) {
acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));

// Ensure there is no commitSync()/close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call commitSync() when there is an existing sync request for node {}", node.id());
future.completeExceptionally(
new IllegalStateException("Attempt to call commitSync() when there is an existing sync request for node : " + node.id()));
} else {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);

metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Add the incoming commitSync() request to the queue.
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();
for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
if (acknowledgements != null) {
acknowledgementsMapForNode.put(tip, acknowledgements);

// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state representing commitSync() and close().
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id());
resultCount.incrementAndGet();
}
}

acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":1",
deadlineMs,
retryBackoffMs,
retryBackoffMaxMs,
sessionHandler,
nodeId,
acknowledgementsMapForNode,
resultHandler,
AcknowledgeRequestType.COMMIT_SYNC
));
}

});

resultHandler.completeIfEmpty();
Expand All @@ -396,7 +434,7 @@ public void commitAsync(final Map<TopicIdPartition, Acknowledgements> acknowledg
if (node != null) {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode = new HashMap<>();

acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
Acknowledgements acknowledgements = acknowledgementsMap.get(tip);
Expand Down Expand Up @@ -470,17 +508,17 @@ public CompletableFuture<Void> acknowledgeOnClose(final Map<TopicIdPartition, Ac
}
}

acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null));
acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null));

// Ensure there is no commitSync()/close() request already present as they are blocking calls
// Ensure there is no close() request already present as they are blocking calls
// and only one request can be active at a time.
if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing sync request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequest());
if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) {
log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
closeFuture.completeExceptionally(
new IllegalStateException("Attempt to call close() when there is an existing sync request for node : " + node.id()));
new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id()));
} else {
// There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext,
// There can only be one close() happening at a time. So per node, there will be one acknowledge request state.
acknowledgeRequestStates.get(nodeId).setCloseRequest(new AcknowledgeRequestState(logContext,
ShareConsumeRequestManager.class.getSimpleName() + ":3",
deadlineMs,
retryBackoffMs,
Expand Down Expand Up @@ -626,9 +664,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
acknowledgeRequestState.processingComplete();

if (!closeFuture.isDone()) {
closeFuture.complete(null);
}
} else {
if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) {
// Received a response-level error code.
Expand Down Expand Up @@ -678,7 +713,9 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget,
}
}

metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
if (acknowledgeRequestState.isProcessed) {
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
}
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
nodesWithPendingRequests.remove(fetchTarget.id());
Expand Down Expand Up @@ -785,6 +822,13 @@ public class AcknowledgeRequestState extends TimedRequestState {
*/
private final AcknowledgeRequestType requestType;

/**
* Boolean to indicate if the request has been processed,
* Set to true once we process the response and do not retry the request.
* Initialized to false every time we build a request.
*/
private boolean isProcessed;

AcknowledgeRequestState(LogContext logContext,
String owner,
long deadlineMs,
Expand All @@ -803,6 +847,7 @@ public class AcknowledgeRequestState extends TimedRequestState {
this.inFlightAcknowledgements = new HashMap<>();
this.incompleteAcknowledgements = new HashMap<>();
this.requestType = acknowledgeRequestType;
this.isProcessed = false;
}

UnsentRequest buildRequest() {
Expand All @@ -823,6 +868,7 @@ UnsentRequest buildRequest() {

log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend);
nodesWithPendingRequests.add(nodeId);
isProcessed = false;

BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
if (error != null) {
Expand Down Expand Up @@ -925,6 +971,7 @@ ShareSessionHandler sessionHandler() {
void processingComplete() {
inFlightAcknowledgements.clear();
resultHandler.completeIfEmpty();
isProcessed = true;
}

/**
Expand Down Expand Up @@ -1009,33 +1056,50 @@ public void completeIfEmpty() {
}
}

static class Pair<V> {
static class Tuple<V> {
private V asyncRequest;
private V syncRequest;
private Queue<V> syncRequestQueue;
private V closeRequest;

public Pair(V asyncRequest, V syncRequest) {
public Tuple(V asyncRequest, Queue<V> syncRequestQueue, V closeRequest) {
this.asyncRequest = asyncRequest;
this.syncRequest = syncRequest;
this.syncRequestQueue = syncRequestQueue;
this.closeRequest = closeRequest;
}

public void setAsyncRequest(V asyncRequest) {
this.asyncRequest = asyncRequest;
}

public void setSyncRequest(V second) {
this.syncRequest = second;
public void nullifySyncRequestQueue() {
this.syncRequestQueue = null;
}

public void addSyncRequest(V syncRequest) {
if (syncRequestQueue == null) {
syncRequestQueue = new LinkedList<>();
}
this.syncRequestQueue.add(syncRequest);
}

public void setCloseRequest(V closeRequest) {
this.closeRequest = closeRequest;
}

public V getAsyncRequest() {
return asyncRequest;
}

public V getSyncRequest() {
return syncRequest;
public Queue<V> getSyncRequestQueue() {
return syncRequestQueue;
}

public V getCloseRequest() {
return closeRequest;
}
}

Pair<AcknowledgeRequestState> requestStates(int nodeId) {
Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return acknowledgeRequestStates.get(nodeId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,11 +711,11 @@ public Map<TopicIdPartition, Optional<KafkaException>> commitSync(final Duration
completedAcknowledgements.forEach((tip, acks) -> {
Errors ackErrorCode = acks.getAcknowledgeErrorCode();
if (ackErrorCode == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
ApiException exception = ackErrorCode.exception();
if (exception == null) {
result.put(tip, null);
result.put(tip, Optional.empty());
} else {
result.put(tip, Optional.of(ackErrorCode.exception()));
}
Expand Down
Loading

0 comments on commit a997ac5

Please sign in to comment.