diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index cfaad3667fa40..a07b2be9084b4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -87,14 +87,15 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private final IdempotentCloser idempotentCloser = new IdempotentCloser(); private Uuid memberId; private boolean fetchMoreRecords = false; - private final Map fetchAcknowledgementsMap; + private final Map fetchAcknowledgementsToSend; + private final Map fetchAcknowledgementsInFlight; private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; private boolean closing = false; private final CompletableFuture closeFuture; private boolean isAcknowledgementCommitCallbackRegistered = false; - private final Map forgottenTopicNames = new HashMap<>(); + private final Map topicNamesMap = new HashMap<>(); ShareConsumeRequestManager(final Time time, final LogContext logContext, @@ -122,7 +123,8 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi this.sessionHandlers = new HashMap<>(); this.nodesWithPendingRequests = new HashSet<>(); this.acknowledgeRequestStates = new HashMap<>(); - this.fetchAcknowledgementsMap = new HashMap<>(); + this.fetchAcknowledgementsToSend = new HashMap<>(); + this.fetchAcknowledgementsInFlight = new HashMap<>(); this.closeFuture = new CompletableFuture<>(); } @@ -170,12 +172,14 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip); + Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); if (acknowledgementsToSend != null) { metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); } handler.addPartitionToFetch(tip, acknowledgementsToSend); fetchedPartitions.add(tip); + topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } @@ -194,15 +198,18 @@ public PollResult poll(long currentTimeMs) { } else { for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { if (!fetchedPartitions.contains(tip)) { - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsMap.get(tip); + Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); + if (acknowledgementsToSend != null) { metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); } + sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); partitionsToForgetMap.get(node).add(tip); - forgottenTopicNames.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); + topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); fetchedPartitions.add(tip); log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); } @@ -253,7 +260,7 @@ public void fetch(Map acknowledgementsMap) { } // The acknowledgements sent via ShareFetch are stored in this map. - acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsMap.merge(tip, acks, Acknowledgements::merge)); + acknowledgementsMap.forEach((tip, acks) -> fetchAcknowledgementsToSend.merge(tip, acks, Acknowledgements::merge)); } /** @@ -566,8 +573,10 @@ public CompletableFuture acknowledgeOnClose(final Map partitions = responseData.keySet().stream().map(TopicIdPartition::topicPartition).collect(Collectors.toSet()); @@ -653,7 +662,7 @@ private void handleShareFetchSuccess(Node fetchTarget, log.debug("ShareFetch for partition {} returned fetch data {}", tip, partitionData); - Acknowledgements acks = fetchAcknowledgementsMap.remove(tip); + Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip); if (acks != null) { if (partitionData.acknowledgeErrorCode() != Errors.NONE.code()) { metricsManager.recordFailedAcknowledgements(acks.size()); @@ -716,7 +725,7 @@ private void handleShareFetchFailure(Node fetchTarget, partition.partitionIndex(), metadata.topicNames().get(topic.topicId())); - Acknowledgements acks = fetchAcknowledgementsMap.remove(tip); + Acknowledgements acks = fetchAcknowledgementsInFlight.remove(tip); if (acks != null) { metricsManager.recordFailedAcknowledgements(acks.size()); acks.setAcknowledgeErrorCode(Errors.forException(error)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index 6af9509d04b90..8ec60d4ea97ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -637,6 +637,52 @@ public void testRetryAcknowledgements() throws InterruptedException { assertEquals(0, shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0)); } + @Test + public void testPiggybackAcknowledgementsInFlight() { + buildRequestManager(); + + assignFromSubscribed(Collections.singleton(tp0)); + + // normal fetch + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + Acknowledgements acknowledgements = Acknowledgements.empty(); + acknowledgements.add(1L, AcknowledgeType.ACCEPT); + acknowledgements.add(2L, AcknowledgeType.ACCEPT); + + // Reading records from the share fetch buffer. + fetchRecords(); + + // Piggyback acknowledgements + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements)); + + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + + assertEquals(2.0, + metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue()); + + Acknowledgements acknowledgements2 = Acknowledgements.empty(); + acknowledgements2.add(3L, AcknowledgeType.ACCEPT); + shareConsumeRequestManager.fetch(Collections.singletonMap(tip0, acknowledgements2)); + + client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + networkClientDelegate.poll(time.timer(0)); + assertTrue(shareConsumeRequestManager.hasCompletedFetches()); + + fetchRecords(); + + assertEquals(1, sendFetches()); + assertFalse(shareConsumeRequestManager.hasCompletedFetches()); + assertEquals(3.0, + metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue()); + } + @Test public void testCommitAsyncWithSubscriptionChange() { buildRequestManager();