diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 173225009e9a3..e35ec87dbaef8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -50,10 +50,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -84,10 +86,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private Uuid memberId; private boolean fetchMoreRecords = false; private final Map fetchAcknowledgementsMap; - private final Map> acknowledgeRequestStates; + private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; - private boolean closing = false; private final CompletableFuture closeFuture; ShareConsumeRequestManager(final Time time, @@ -132,7 +133,7 @@ public PollResult poll(long currentTimeMs) { return pollResult; } - if (!fetchMoreRecords || closing) { + if (!fetchMoreRecords) { return PollResult.EMPTY; } @@ -216,7 +217,7 @@ public void fetch(Map acknowledgementsMap) { private PollResult processAcknowledgements(long currentTimeMs) { List unsentRequests = new ArrayList<>(); AtomicBoolean isAsyncDone = new AtomicBoolean(); - for (Map.Entry> requestStates : acknowledgeRequestStates.entrySet()) { + for (Map.Entry> requestStates : acknowledgeRequestStates.entrySet()) { int nodeId = requestStates.getKey(); if (!isNodeFree(nodeId)) { @@ -226,10 +227,25 @@ private PollResult processAcknowledgements(long currentTimeMs) { // For commitAsync maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add); // Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process. - if (!isNodeFree(nodeId)) { - log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); - } else if (isAsyncDone.get()) { - maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add); + if (isAsyncDone.get()) { + // We try to process the close request only if we have processed the async and the sync requests for the node. + if (requestStates.getValue().getSyncRequestQueue() == null) { + if (!isNodeFree(nodeId)) { + log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); + } else { + AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest(); + + maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add); + } + } else { + for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) { + if (!isNodeFree(nodeId)) { + log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); + break; + } + maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add); + } + } } } } @@ -240,14 +256,7 @@ private PollResult processAcknowledgements(long currentTimeMs) { } else if (checkAndRemoveCompletedAcknowledgements()) { // Return empty result until all the acknowledgement request states are processed pollResult = PollResult.EMPTY; - } else if (closing) { - if (!closeFuture.isDone()) { - log.trace("Completing acknowledgement on close"); - closeFuture.complete(null); - } - pollResult = PollResult.EMPTY; } - return pollResult; } @@ -298,19 +307,32 @@ private Optional maybeBuildRequest(AcknowledgeRequestState acknow /** * Prunes the empty acknowledgementRequestStates. - * Returns true if there are still some acknowledgements left to be processed. + * Returns true if there are still any acknowledgements left to be processed. + * If there is a pending/inFlight close request, we always return true. */ private boolean checkAndRemoveCompletedAcknowledgements() { boolean areAnyAcksLeft = false; - Iterator>> iterator = acknowledgeRequestStates.entrySet().iterator(); + Iterator>> iterator = acknowledgeRequestStates.entrySet().iterator(); + while (iterator.hasNext()) { - Map.Entry> acknowledgeRequestStatePair = iterator.next(); - if (isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest()) || isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest())) { + Map.Entry> acknowledgeRequestStatePair = iterator.next(); + boolean areAsyncAcksLeft = true, areSyncAcksLeft = true; + if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())) { + acknowledgeRequestStatePair.getValue().setAsyncRequest(null); + areAsyncAcksLeft = false; + } + if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) { + acknowledgeRequestStatePair.getValue().setSyncRequestQueue(null); + areSyncAcksLeft = false; + } + + if (areAsyncAcksLeft || areSyncAcksLeft) { areAnyAcksLeft = true; - } else if (!closing) { + } else if (acknowledgeRequestStatePair.getValue().getCloseRequest() == null) { iterator.remove(); } } + if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true; return areAnyAcksLeft; } @@ -319,6 +341,16 @@ private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequ return acknowledgeRequestState != null && !(acknowledgeRequestState.isEmpty()); } + private boolean areRequestStatesInProgress(Queue acknowledgeRequestStates) { + if (acknowledgeRequestStates == null) return false; + for (AcknowledgeRequestState acknowledgeRequestState : acknowledgeRequestStates) { + if (isRequestStateInProgress(acknowledgeRequestState)) { + return true; + } + } + return false; + } + /** * Enqueue an AcknowledgeRequestState to be picked up on the next poll * @@ -340,42 +372,38 @@ public CompletableFuture> commitSync( sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { - acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null)); + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); - // Ensure there is no commitSync()/close() request already present as they are blocking calls - // and only one request can be active at a time. - if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) { - log.error("Attempt to call commitSync() when there is an existing sync request for node {}", node.id()); - future.completeExceptionally( - new IllegalStateException("Attempt to call commitSync() when there is an existing sync request for node : " + node.id())); - } else { - Map acknowledgementsMapForNode = new HashMap<>(); - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - Acknowledgements acknowledgements = acknowledgementsMap.get(tip); - if (acknowledgements != null) { - acknowledgementsMapForNode.put(tip, acknowledgements); - - metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - resultCount.incrementAndGet(); - } - } + // Add the incoming commitSync() request to the queue. + Map acknowledgementsMapForNode = new HashMap<>(); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = acknowledgementsMap.get(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); + resultCount.incrementAndGet(); + } + } - // There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state representing commitSync() and close(). - acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":1", - deadlineMs, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - resultHandler, - AcknowledgeRequestType.COMMIT_SYNC - )); + if (acknowledgeRequestStates.get(nodeId).getSyncRequestQueue() == null) { + acknowledgeRequestStates.get(nodeId).setSyncRequestQueue(new LinkedList<>()); } + + acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":1", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + resultHandler, + AcknowledgeRequestType.COMMIT_SYNC + )); } + }); resultHandler.completeIfEmpty(); @@ -396,7 +424,7 @@ public void commitAsync(final Map acknowledg if (node != null) { Map acknowledgementsMapForNode = new HashMap<>(); - acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null)); + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { Acknowledgements acknowledgements = acknowledgementsMap.get(tip); @@ -448,8 +476,6 @@ public CompletableFuture acknowledgeOnClose(final Map { Node node = cluster.nodeById(nodeId); if (node != null) { @@ -470,17 +496,17 @@ public CompletableFuture acknowledgeOnClose(final Map(null, null)); + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); - // Ensure there is no commitSync()/close() request already present as they are blocking calls + // Ensure there is no close() request already present as they are blocking calls // and only one request can be active at a time. - if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) { - log.error("Attempt to call close() when there is an existing sync request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequest()); + if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) { + log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue()); closeFuture.completeExceptionally( - new IllegalStateException("Attempt to call close() when there is an existing sync request for node : " + node.id())); + new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id())); } else { - // There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state. - acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext, + // There can only be one close() happening at a time. So per node, there will be one acknowledge request state. + acknowledgeRequestStates.get(nodeId).setCloseRequest(new AcknowledgeRequestState(logContext, ShareConsumeRequestManager.class.getSimpleName() + ":3", deadlineMs, retryBackoffMs, @@ -626,10 +652,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget, acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs); acknowledgeRequestState.processingComplete(); - if (!closeFuture.isDone()) { - closeFuture.complete(null); - } - metricsManager.recordLatency(resp.requestLatencyMs()); } else { if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) { @@ -927,6 +949,9 @@ ShareSessionHandler sessionHandler() { void processingComplete() { inFlightAcknowledgements.clear(); resultHandler.completeIfEmpty(); + if (onClose() && !closeFuture.isDone()) { + closeFuture.complete(null); + } } /** @@ -1011,33 +1036,47 @@ public void completeIfEmpty() { } } - static class Pair { + static class Tuple { private V asyncRequest; - private V syncRequest; + private Queue syncRequestQueue; + private V closeRequest; - public Pair(V asyncRequest, V syncRequest) { + public Tuple(V asyncRequest, Queue syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; - this.syncRequest = syncRequest; + this.syncRequestQueue = syncRequestQueue; + this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } - public void setSyncRequest(V second) { - this.syncRequest = second; + public void setSyncRequestQueue(Queue syncRequestQueue) { + this.syncRequestQueue = syncRequestQueue; + } + + public void addSyncRequest(V syncRequest) { + this.syncRequestQueue.add(syncRequest); + } + + public void setCloseRequest(V closeRequest) { + this.closeRequest = closeRequest; } public V getAsyncRequest() { return asyncRequest; } - public V getSyncRequest() { - return syncRequest; + public Queue getSyncRequestQueue() { + return syncRequestQueue; + } + + public V getCloseRequest() { + return closeRequest; } } - Pair requestStates(int nodeId) { + Tuple requestStates(int nodeId) { return acknowledgeRequestStates.get(nodeId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 9977d61fe142f..04e40cb3925ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -710,11 +710,11 @@ public Map> commitSync(final Duration completedAcknowledgements.forEach((tip, acks) -> { Errors ackErrorCode = acks.getAcknowledgeErrorCode(); if (ackErrorCode == null) { - result.put(tip, null); + result.put(tip, Optional.empty()); } else { ApiException exception = ackErrorCode.exception(); if (exception == null) { - result.put(tip, null); + result.put(tip, Optional.empty()); } else { result.put(tip, Optional.of(ackErrorCode.exception())); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 2da213a467507..e0d0d0a9ef141 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -500,7 +500,8 @@ public void testPendingCommitAsyncBeforeCommitSync() { shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements2), 60000L); assertEquals(3, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0)); - assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -510,16 +511,18 @@ public void testPendingCommitAsyncBeforeCommitSync() { networkClientDelegate.poll(time.timer(0)); assertEquals(0, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0)); - assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); - assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); } @Test @@ -548,27 +551,27 @@ public void testRetryAcknowledgements() throws InterruptedException { shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L); assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest()); - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); - - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.REQUEST_TIMED_OUT)); networkClientDelegate.poll(time.timer(0)); - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements())); - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0)); } @Test @@ -1122,7 +1125,7 @@ private int sendAcknowledgements() { return pollResult.unsentRequests.size(); } - public Pair requestStates(int nodeId) { + public Tuple requestStates(int nodeId) { return super.requestStates(nodeId); } }