From 743e185c8bf6d1d548fe7041ce543c7c31388e68 Mon Sep 17 00:00:00 2001 From: ShivsundarR Date: Tue, 3 Sep 2024 09:01:49 +0530 Subject: [PATCH] KAFKA-17450 Reduced the handlers for handling ShareAcknowledgeResponse (#17061) Currently there are 4 handler functions present for handling ShareAcknowledge responses. ShareConsumeRequestManager had an interface and the respective handlers would implement it. Instead of having 4 different handlers for this, now using AcknowledgeRequestType, we could merge the code and have only 2 handler functions, one for ShareAcknowledge success and one for ShareAcknowledge failure, eliminating the need for the interface. This PR also fixes a bug - We were not using the time at which the response was received while handling the ShareAcknowledge response, we were using an outdated time. Now the bug is fixed. Reviewers: Manikumar Reddy , Andrew Schofield , Chia-Ping Tsai --- .../internals/ShareConsumeRequestManager.java | 219 +++++++----------- 1 file changed, 84 insertions(+), 135 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 4c83c99a65731..173225009e9a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -281,7 +281,7 @@ private Optional maybeBuildRequest(AcknowledgeRequestState acknow return Optional.empty(); } - UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); + UnsentRequest request = acknowledgeRequestState.buildRequest(); if (request == null) { asyncDone = false; return Optional.empty(); @@ -371,8 +371,6 @@ public CompletableFuture> commitSync( sessionHandler, nodeId, acknowledgementsMapForNode, - this::handleShareAcknowledgeSuccess, - this::handleShareAcknowledgeFailure, resultHandler, AcknowledgeRequestType.COMMIT_SYNC )); @@ -417,8 +415,6 @@ public void commitAsync(final Map acknowledg sessionHandler, nodeId, acknowledgementsMapForNode, - this::handleShareAcknowledgeSuccess, - this::handleShareAcknowledgeFailure, resultHandler, AcknowledgeRequestType.COMMIT_ASYNC )); @@ -492,8 +488,6 @@ public CompletableFuture acknowledgeOnClose(final Map topic.partitions().forEach(partition -> { + TopicIdPartition tip = new TopicIdPartition(topic.topicId(), + partition.partitionIndex(), + metadata.topicNames().get(topic.topicId())); + if (partition.errorCode() != Errors.NONE.code()) { + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); + } + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); + })); - final short requestVersion = resp.requestHeader().apiVersion(); + acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs); + acknowledgeRequestState.processingComplete(); - if (!handler.handleResponse(response, requestVersion)) { - acknowledgeRequestState.onFailedAttempt(currentTimeMs); - if (response.error().exception() instanceof RetriableException && !acknowledgeRequestState.onClose()) { - // We retry the request until the timer expires, unless we are closing. - acknowledgeRequestState.retryRequest(); + if (!closeFuture.isDone()) { + closeFuture.complete(null); + } + + metricsManager.recordLatency(resp.requestLatencyMs()); + } else { + if (!acknowledgeRequestState.sessionHandler.handleResponse(response, resp.requestHeader().apiVersion())) { + // Received a response-level error code. + acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs); + + if (response.error().exception() instanceof RetriableException) { + // We retry the request until the timer expires, unless we are closing. + acknowledgeRequestState.moveAllToIncompleteAcks(); + } else { + response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> { + TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(), + partitionData.partitionIndex(), + metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId())); + + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error()); + metricsManager.recordLatency(resp.requestLatencyMs()); + })); + acknowledgeRequestState.processingComplete(); + } } else { + AtomicBoolean shouldRetry = new AtomicBoolean(false); + // Check all partition level error codes response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> { + Errors partitionError = Errors.forCode(partitionData.errorCode()); TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(), partitionData.partitionIndex(), metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId())); - - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error()); - metricsManager.recordLatency(resp.requestLatencyMs()); - })); - } - } else { - AtomicBoolean shouldRetry = new AtomicBoolean(false); - // Check all partition level error codes - response.data().responses().forEach(shareAcknowledgeTopicResponse -> shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> { - Errors partitionError = Errors.forCode(partitionData.errorCode()); - TopicIdPartition tip = new TopicIdPartition(shareAcknowledgeTopicResponse.topicId(), - partitionData.partitionIndex(), - metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId())); - if (partitionError.exception() != null) { - if (partitionError.exception() instanceof RetriableException && !acknowledgeRequestState.onClose()) { - // Move to incomplete acknowledgements to retry - acknowledgeRequestState.moveToIncompleteAcks(tip); - shouldRetry.set(true); + if (partitionError.exception() != null) { + if (partitionError.exception() instanceof RetriableException) { + // Move to incomplete acknowledgements to retry + acknowledgeRequestState.moveToIncompleteAcks(tip); + shouldRetry.set(true); + } else { + metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); + acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError); + } } else { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError); } + })); + + if (shouldRetry.get()) { + acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs); } else { - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, partitionError); + acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs); } - })); - - if (shouldRetry.get()) { - acknowledgeRequestState.onFailedAttempt(currentTimeMs); - } else { - acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs); + acknowledgeRequestState.processingComplete(); + metricsManager.recordLatency(resp.requestLatencyMs()); } - acknowledgeRequestState.processingComplete(); } - metricsManager.recordLatency(resp.requestLatencyMs()); } finally { log.debug("Removing pending request for node {} - success", fetchTarget.id()); nodesWithPendingRequests.remove(fetchTarget.id()); + + if (acknowledgeRequestState.onClose()) { + log.debug("Removing node from ShareSession {}", fetchTarget.id()); + sessionHandlers.remove(fetchTarget.id()); + } } } @@ -676,11 +696,11 @@ private void handleShareAcknowledgeFailure(Node fetchTarget, ShareAcknowledgeRequestData requestData, AcknowledgeRequestState acknowledgeRequestState, Throwable error, - long currentTimeMs) { + long responseCompletionTimeMs) { try { log.debug("Completed ShareAcknowledge request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error)); acknowledgeRequestState.sessionHandler().handleError(error); - acknowledgeRequestState.onFailedAttempt(currentTimeMs); + acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs); requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { TopicIdPartition tip = new TopicIdPartition(topic.topicId(), @@ -689,62 +709,16 @@ private void handleShareAcknowledgeFailure(Node fetchTarget, metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); })); - } finally { - log.debug("Removing pending request for node {} - failed", fetchTarget.id()); - nodesWithPendingRequests.remove(fetchTarget.id()); - } - } - - private void handleShareAcknowledgeCloseSuccess(Node fetchTarget, - ShareAcknowledgeRequestData requestData, - AcknowledgeRequestState acknowledgeRequestState, - ClientResponse resp, - long currentTimeMs) { - try { - log.debug("Completed ShareAcknowledge on close request from node {} successfully", fetchTarget.id()); - final ShareAcknowledgeResponse response = (ShareAcknowledgeResponse) resp.responseBody(); - - response.data().responses().forEach(topic -> topic.partitions().forEach(partition -> { - TopicIdPartition tip = new TopicIdPartition(topic.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topic.topicId())); - if (partition.errorCode() != Errors.NONE.code()) { - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); - } - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forCode(partition.errorCode())); - })); - acknowledgeRequestState.onSuccessfulAttempt(currentTimeMs); - metricsManager.recordLatency(resp.requestLatencyMs()); acknowledgeRequestState.processingComplete(); - } finally { - log.debug("Removing pending request for node {} - success", fetchTarget.id()); - nodesWithPendingRequests.remove(fetchTarget.id()); - sessionHandlers.remove(fetchTarget.id()); - } - } - - private void handleShareAcknowledgeCloseFailure(Node fetchTarget, - ShareAcknowledgeRequestData requestData, - AcknowledgeRequestState acknowledgeRequestState, - Throwable error, - long currentTimeMs) { - try { - log.debug("Completed ShareAcknowledge on close request from node {} unsuccessfully {}", fetchTarget.id(), Errors.forException(error)); - acknowledgeRequestState.sessionHandler().handleError(error); - acknowledgeRequestState.onFailedAttempt(currentTimeMs); - - requestData.topics().forEach(topic -> topic.partitions().forEach(partition -> { - TopicIdPartition tip = new TopicIdPartition(topic.topicId(), - partition.partitionIndex(), - metadata.topicNames().get(topic.topicId())); - metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip)); - acknowledgeRequestState.handleAcknowledgeErrorCode(tip, Errors.forException(error)); - })); } finally { log.debug("Removing pending request for node {} - failed", fetchTarget.id()); nodesWithPendingRequests.remove(fetchTarget.id()); - sessionHandlers.remove(fetchTarget.id()); + + if (acknowledgeRequestState.onClose()) { + log.debug("Removing node from ShareSession {}", fetchTarget.id()); + sessionHandlers.remove(fetchTarget.id()); + } } } @@ -803,16 +777,6 @@ public class AcknowledgeRequestState extends TimedRequestState { */ private final Map inFlightAcknowledgements; - /** - * The handler to call on a successful response from ShareAcknowledge. - */ - private final ResponseHandler successHandler; - - /** - * The handler to call on a failed response from ShareAcknowledge. - */ - private final ResponseHandler errorHandler; - /** * This handles completing a future when all results are known. */ @@ -831,15 +795,11 @@ public class AcknowledgeRequestState extends TimedRequestState { ShareSessionHandler sessionHandler, int nodeId, Map acknowledgementsMap, - ResponseHandler successHandler, - ResponseHandler errorHandler, ResultHandler resultHandler, AcknowledgeRequestType acknowledgeRequestType) { super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, deadlineTimer(time, deadlineMs)); this.sessionHandler = sessionHandler; this.nodeId = nodeId; - this.successHandler = successHandler; - this.errorHandler = errorHandler; this.acknowledgementsToSend = acknowledgementsMap; this.resultHandler = resultHandler; this.inFlightAcknowledgements = new HashMap<>(); @@ -847,7 +807,7 @@ public class AcknowledgeRequestState extends TimedRequestState { this.requestType = acknowledgeRequestType; } - UnsentRequest buildRequest(long currentTimeMs) { + UnsentRequest buildRequest() { // If this is the closing request, close the share session by setting the final epoch if (onClose()) { sessionHandler.notifyClose(); @@ -868,13 +828,9 @@ UnsentRequest buildRequest(long currentTimeMs) { BiConsumer responseHandler = (clientResponse, error) -> { if (error != null) { - errorHandler.handle(nodeToSend, requestBuilder.data(), this, error, currentTimeMs); - processingComplete(); + handleShareAcknowledgeFailure(nodeToSend, requestBuilder.data(), this, error, clientResponse.receivedTimeMs()); } else { - successHandler.handle(nodeToSend, requestBuilder.data(), this, clientResponse, currentTimeMs); - if (onClose() && !closeFuture.isDone()) { - closeFuture.complete(null); - } + handleShareAcknowledgeSuccess(nodeToSend, requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs()); } }; @@ -973,7 +929,11 @@ void processingComplete() { resultHandler.completeIfEmpty(); } - void retryRequest() { + /** + * Moves all the in-flight acknowledgements to incomplete acknowledgements to retry + * in the next request. + */ + void moveAllToIncompleteAcks() { incompleteAcknowledgements.putAll(inFlightAcknowledgements); inFlightAcknowledgements.clear(); } @@ -982,13 +942,14 @@ boolean maybeExpire() { return numAttempts > 0 && isExpired(); } + /** + * Moves the in-flight acknowledgements for a given partition to incomplete acknowledgements to retry + * in the next request. + */ public void moveToIncompleteAcks(TopicIdPartition tip) { Acknowledgements acks = inFlightAcknowledgements.remove(tip); if (acks != null) { - Acknowledgements existingAcks = incompleteAcknowledgements.putIfAbsent(tip, acks); - if (existingAcks != null) { - incompleteAcknowledgements.get(tip).merge(acks); - } + incompleteAcknowledgements.put(tip, acks); } } @@ -1001,18 +962,6 @@ public boolean onCommitAsync() { } } - /** - * Defines the contract for handling responses from brokers. - * @param Type of response, usually either {@link ClientResponse} or {@link Throwable} - */ - @FunctionalInterface - private interface ResponseHandler { - /** - * Handle the response from the given {@link Node target} - */ - void handle(Node target, ShareAcknowledgeRequestData request, AcknowledgeRequestState requestState, T response, long currentTimeMs); - } - /** * Sends a ShareAcknowledgeCommitCallback event to the application when it is done * processing all the remaining acknowledgement request states.