diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 5e50c3dedb6d5..d64d678d8ee8e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -50,10 +50,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -84,7 +86,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private Uuid memberId; private boolean fetchMoreRecords = false; private final Map fetchAcknowledgementsMap; - private final Map> acknowledgeRequestStates; + private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; private boolean closing = false; @@ -132,7 +134,7 @@ public PollResult poll(long currentTimeMs) { return pollResult; } - if (!fetchMoreRecords || closing) { + if (!fetchMoreRecords) { return PollResult.EMPTY; } @@ -216,7 +218,7 @@ public void fetch(Map acknowledgementsMap) { private PollResult processAcknowledgements(long currentTimeMs) { List unsentRequests = new ArrayList<>(); AtomicBoolean isAsyncDone = new AtomicBoolean(); - for (Map.Entry> requestStates : acknowledgeRequestStates.entrySet()) { + for (Map.Entry> requestStates : acknowledgeRequestStates.entrySet()) { int nodeId = requestStates.getKey(); if (!isNodeFree(nodeId)) { @@ -226,10 +228,25 @@ private PollResult processAcknowledgements(long currentTimeMs) { // For commitAsync maybeBuildRequest(requestStates.getValue().getAsyncRequest(), currentTimeMs, true, isAsyncDone).ifPresent(unsentRequests::add); // Check to ensure we start processing commitSync/close only if there are no commitAsync requests left to process. - if (!isNodeFree(nodeId)) { - log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); - } else if (isAsyncDone.get()) { - maybeBuildRequest(requestStates.getValue().getSyncRequest(), currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add); + if (isAsyncDone.get()) { + // We try to process the close request only if we have processed the async and the sync requests for the node. + if (requestStates.getValue().getSyncRequestQueue() == null) { + if (!isNodeFree(nodeId)) { + log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); + } else { + AcknowledgeRequestState closeRequestState = requestStates.getValue().getCloseRequest(); + + maybeBuildRequest(closeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add); + } + } else { + for (AcknowledgeRequestState acknowledgeRequestState : requestStates.getValue().getSyncRequestQueue()) { + if (!isNodeFree(nodeId)) { + log.trace("Skipping acknowledge request because previous request to {} has not been processed, so acks are not sent", nodeId); + break; + } + maybeBuildRequest(acknowledgeRequestState, currentTimeMs, false, isAsyncDone).ifPresent(unsentRequests::add); + } + } } } } @@ -242,12 +259,10 @@ private PollResult processAcknowledgements(long currentTimeMs) { pollResult = PollResult.EMPTY; } else if (closing) { if (!closeFuture.isDone()) { - log.trace("Completing acknowledgement on close"); closeFuture.complete(null); } pollResult = PollResult.EMPTY; } - return pollResult; } @@ -298,25 +313,56 @@ private Optional maybeBuildRequest(AcknowledgeRequestState acknow /** * Prunes the empty acknowledgementRequestStates. - * Returns true if there are still some acknowledgements left to be processed. + * Returns true if there are still any acknowledgements left to be processed. */ private boolean checkAndRemoveCompletedAcknowledgements() { boolean areAnyAcksLeft = false; - Iterator>> iterator = acknowledgeRequestStates.entrySet().iterator(); + Iterator>> iterator = acknowledgeRequestStates.entrySet().iterator(); + while (iterator.hasNext()) { - Map.Entry> acknowledgeRequestStatePair = iterator.next(); - if (isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest()) || isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getSyncRequest())) { + Map.Entry> acknowledgeRequestStatePair = iterator.next(); + boolean areAsyncAcksLeft = true, areSyncAcksLeft = true; + if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getAsyncRequest())) { + acknowledgeRequestStatePair.getValue().setAsyncRequest(null); + areAsyncAcksLeft = false; + } + if (!areRequestStatesInProgress(acknowledgeRequestStatePair.getValue().getSyncRequestQueue())) { + acknowledgeRequestStatePair.getValue().nullifySyncRequestQueue(); + areSyncAcksLeft = false; + } + if (!isRequestStateInProgress(acknowledgeRequestStatePair.getValue().getCloseRequest())) { + acknowledgeRequestStatePair.getValue().setCloseRequest(null); + } + + if (areAsyncAcksLeft || areSyncAcksLeft) { areAnyAcksLeft = true; - } else if (!closing) { + } else if (acknowledgeRequestStatePair.getValue().getCloseRequest() == null) { iterator.remove(); } } + if (!acknowledgeRequestStates.isEmpty()) areAnyAcksLeft = true; return areAnyAcksLeft; } private boolean isRequestStateInProgress(AcknowledgeRequestState acknowledgeRequestState) { - return acknowledgeRequestState != null && !(acknowledgeRequestState.isEmpty()); + if (acknowledgeRequestState == null) { + return false; + } else if (acknowledgeRequestState.onClose()) { + return !acknowledgeRequestState.isProcessed; + } else { + return !(acknowledgeRequestState.isEmpty()); + } + } + + private boolean areRequestStatesInProgress(Queue acknowledgeRequestStates) { + if (acknowledgeRequestStates == null) return false; + for (AcknowledgeRequestState acknowledgeRequestState : acknowledgeRequestStates) { + if (isRequestStateInProgress(acknowledgeRequestState)) { + return true; + } + } + return false; } /** @@ -340,42 +386,34 @@ public CompletableFuture> commitSync( sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { - acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null)); - - // Ensure there is no commitSync()/close() request already present as they are blocking calls - // and only one request can be active at a time. - if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) { - log.error("Attempt to call commitSync() when there is an existing sync request for node {}", node.id()); - future.completeExceptionally( - new IllegalStateException("Attempt to call commitSync() when there is an existing sync request for node : " + node.id())); - } else { - Map acknowledgementsMapForNode = new HashMap<>(); - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - Acknowledgements acknowledgements = acknowledgementsMap.get(tip); - if (acknowledgements != null) { - acknowledgementsMapForNode.put(tip, acknowledgements); - - metricsManager.recordAcknowledgementSent(acknowledgements.size()); - log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); - resultCount.incrementAndGet(); - } - } + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); + // Add the incoming commitSync() request to the queue. + Map acknowledgementsMapForNode = new HashMap<>(); + for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { + Acknowledgements acknowledgements = acknowledgementsMap.get(tip); + if (acknowledgements != null) { + acknowledgementsMapForNode.put(tip, acknowledgements); - // There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state representing commitSync() and close(). - acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext, - ShareConsumeRequestManager.class.getSimpleName() + ":1", - deadlineMs, - retryBackoffMs, - retryBackoffMaxMs, - sessionHandler, - nodeId, - acknowledgementsMapForNode, - resultHandler, - AcknowledgeRequestType.COMMIT_SYNC - )); + metricsManager.recordAcknowledgementSent(acknowledgements.size()); + log.debug("Added sync acknowledge request for partition {} to node {}", tip.topicPartition(), node.id()); + resultCount.incrementAndGet(); + } } + + acknowledgeRequestStates.get(nodeId).addSyncRequest(new AcknowledgeRequestState(logContext, + ShareConsumeRequestManager.class.getSimpleName() + ":1", + deadlineMs, + retryBackoffMs, + retryBackoffMaxMs, + sessionHandler, + nodeId, + acknowledgementsMapForNode, + resultHandler, + AcknowledgeRequestType.COMMIT_SYNC + )); } + }); resultHandler.completeIfEmpty(); @@ -396,7 +434,7 @@ public void commitAsync(final Map acknowledg if (node != null) { Map acknowledgementsMapForNode = new HashMap<>(); - acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, null)); + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { Acknowledgements acknowledgements = acknowledgementsMap.get(tip); @@ -470,17 +508,17 @@ public CompletableFuture acknowledgeOnClose(final Map(null, null)); + acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, null, null)); - // Ensure there is no commitSync()/close() request already present as they are blocking calls + // Ensure there is no close() request already present as they are blocking calls // and only one request can be active at a time. - if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != null && !acknowledgeRequestStates.get(nodeId).getSyncRequest().isEmpty()) { - log.error("Attempt to call close() when there is an existing sync request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequest()); + if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null && !acknowledgeRequestStates.get(nodeId).getCloseRequest().isEmpty()) { + log.error("Attempt to call close() when there is an existing close request for node {}-{}", node.id(), acknowledgeRequestStates.get(nodeId).getSyncRequestQueue()); closeFuture.completeExceptionally( - new IllegalStateException("Attempt to call close() when there is an existing sync request for node : " + node.id())); + new IllegalStateException("Attempt to call close() when there is an existing close request for node : " + node.id())); } else { - // There can only be one commitSync()/close() happening at a time. So per node, there will be one acknowledge request state. - acknowledgeRequestStates.get(nodeId).setSyncRequest(new AcknowledgeRequestState(logContext, + // There can only be one close() happening at a time. So per node, there will be one acknowledge request state. + acknowledgeRequestStates.get(nodeId).setCloseRequest(new AcknowledgeRequestState(logContext, ShareConsumeRequestManager.class.getSimpleName() + ":3", deadlineMs, retryBackoffMs, @@ -626,9 +664,6 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget, acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs); acknowledgeRequestState.processingComplete(); - if (!closeFuture.isDone()) { - closeFuture.complete(null); - } } else { if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) { // Received a response-level error code. @@ -678,7 +713,9 @@ private void handleShareAcknowledgeSuccess(Node fetchTarget, } } - metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); + if (acknowledgeRequestState.isProcessed) { + metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); + } } finally { log.debug("Removing pending request for node {} - success", fetchTarget.id()); nodesWithPendingRequests.remove(fetchTarget.id()); @@ -785,6 +822,13 @@ public class AcknowledgeRequestState extends TimedRequestState { */ private final AcknowledgeRequestType requestType; + /** + * Boolean to indicate if the request has been processed, + * Set to true once we process the response and do not retry the request. + * Initialized to false every time we build a request. + */ + private boolean isProcessed; + AcknowledgeRequestState(LogContext logContext, String owner, long deadlineMs, @@ -803,6 +847,7 @@ public class AcknowledgeRequestState extends TimedRequestState { this.inFlightAcknowledgements = new HashMap<>(); this.incompleteAcknowledgements = new HashMap<>(); this.requestType = acknowledgeRequestType; + this.isProcessed = false; } UnsentRequest buildRequest() { @@ -823,6 +868,7 @@ UnsentRequest buildRequest() { log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend); nodesWithPendingRequests.add(nodeId); + isProcessed = false; BiConsumer responseHandler = (clientResponse, error) -> { if (error != null) { @@ -925,6 +971,7 @@ ShareSessionHandler sessionHandler() { void processingComplete() { inFlightAcknowledgements.clear(); resultHandler.completeIfEmpty(); + isProcessed = true; } /** @@ -1009,33 +1056,50 @@ public void completeIfEmpty() { } } - static class Pair { + static class Tuple { private V asyncRequest; - private V syncRequest; + private Queue syncRequestQueue; + private V closeRequest; - public Pair(V asyncRequest, V syncRequest) { + public Tuple(V asyncRequest, Queue syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; - this.syncRequest = syncRequest; + this.syncRequestQueue = syncRequestQueue; + this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } - public void setSyncRequest(V second) { - this.syncRequest = second; + public void nullifySyncRequestQueue() { + this.syncRequestQueue = null; + } + + public void addSyncRequest(V syncRequest) { + if (syncRequestQueue == null) { + syncRequestQueue = new LinkedList<>(); + } + this.syncRequestQueue.add(syncRequest); + } + + public void setCloseRequest(V closeRequest) { + this.closeRequest = closeRequest; } public V getAsyncRequest() { return asyncRequest; } - public V getSyncRequest() { - return syncRequest; + public Queue getSyncRequestQueue() { + return syncRequestQueue; + } + + public V getCloseRequest() { + return closeRequest; } } - Pair requestStates(int nodeId) { + Tuple requestStates(int nodeId) { return acknowledgeRequestStates.get(nodeId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 091e0c6d99b27..bee90b17fd9c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -711,11 +711,11 @@ public Map> commitSync(final Duration completedAcknowledgements.forEach((tip, acks) -> { Errors ackErrorCode = acks.getAcknowledgeErrorCode(); if (ackErrorCode == null) { - result.put(tip, null); + result.put(tip, Optional.empty()); } else { ApiException exception = ackErrorCode.exception(); if (exception == null) { - result.put(tip, null); + result.put(tip, Optional.empty()); } else { result.put(tip, Optional.of(ackErrorCode.exception())); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 2da213a467507..e0d0d0a9ef141 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -500,7 +500,8 @@ public void testPendingCommitAsyncBeforeCommitSync() { shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements2), 60000L); assertEquals(3, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0)); - assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -510,16 +511,18 @@ public void testPendingCommitAsyncBeforeCommitSync() { networkClientDelegate.poll(time.timer(0)); assertEquals(0, shareConsumeRequestManager.requestStates(0).getAsyncRequest().getInFlightAcknowledgementsCount(tip0)); - assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); - assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(3, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); } @Test @@ -548,27 +551,27 @@ public void testRetryAcknowledgements() throws InterruptedException { shareConsumeRequestManager.commitSync(Collections.singletonMap(tip0, acknowledgements), 60000L); assertNull(shareConsumeRequestManager.requestStates(0).getAsyncRequest()); - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getAcknowledgementsToSendCount(tip0)); + assertEquals(1, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size()); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getAcknowledgementsToSendCount(tip0)); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); - - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.REQUEST_TIMED_OUT)); networkClientDelegate.poll(time.timer(0)); - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, shareConsumeRequestManager.sendAcknowledgements())); - assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); + assertEquals(6, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getInFlightAcknowledgementsCount(tip0)); - assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequest().getIncompleteAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0)); + assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0)); } @Test @@ -1122,7 +1125,7 @@ private int sendAcknowledgements() { return pollResult.unsentRequests.size(); } - public Pair requestStates(int nodeId) { + public Tuple requestStates(int nodeId) { return super.requestStates(nodeId); } }