From 331008cdf1eb73f450e436992acd40c9a98a6b35 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 6 Oct 2022 21:05:10 +0800 Subject: [PATCH] [refactor][java] Unify the acknowledge process for batch and non-batch message IDs (#17833) (cherry picked from commit 83309edab53e090c4126dca8a46b6a5499a3b257) --- .../client/impl/ConsumerAckResponseTest.java | 100 ------- .../pulsar/client/impl/ConsumerAckTest.java | 256 ++++++++++++++++++ .../client/impl/BatchMessageIdImpl.java | 6 + ...sistentAcknowledgmentsGroupingTracker.java | 139 +++++----- 4 files changed, 333 insertions(+), 168 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java deleted file mode 100644 index f86bbabdd8808..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckResponseTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.testng.Assert.fail; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import lombok.Cleanup; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.transaction.TransactionImpl; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Test(groups = "broker-impl") -public class ConsumerAckResponseTest extends ProducerConsumerBase { - - private TransactionImpl transaction; - - @BeforeClass(alwaysRun = true) - public void setup() throws Exception { - super.internalSetup(); - super.producerBaseSetup(); - transaction = mock(TransactionImpl.class); - doReturn(1L).when(transaction).getTxnIdLeastBits(); - doReturn(1L).when(transaction).getTxnIdMostBits(); - doReturn(TransactionImpl.State.OPEN).when(transaction).getState(); - CompletableFuture completableFuture = CompletableFuture.completedFuture(null); - doNothing().when(transaction).registerAckOp(any()); - doReturn(true).when(transaction).checkIfOpen(any()); - doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any()); - - Thread.sleep(1000 * 3); - } - - @AfterClass(alwaysRun = true) - public void cleanup() throws Exception { - super.internalCleanup(); - } - - @Test - public void testAckResponse() throws PulsarClientException, InterruptedException { - String topic = "testAckResponse"; - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.INT32) - .topic(topic) - .enableBatching(false) - .create(); - @Cleanup - ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) - .topic(topic) - .subscriptionName("sub") - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscribe(); - producer.send(1); - producer.send(2); - try { - consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get(); - fail(); - } catch (ExecutionException e) { - Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException); - } - Message message = consumer.receive(); - - try { - consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); - fail(); - } catch (ExecutionException e) { - Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException); - } - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java new file mode 100644 index 0000000000000..ea2815641f1b7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerInterceptor; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Sets; + +@Slf4j +@Test(groups = "broker-impl") +public class ConsumerAckTest extends ProducerConsumerBase { + + private TransactionImpl transaction; + private PulsarClient clientWithStats; + + @BeforeClass(alwaysRun = true) + public void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + this.clientWithStats = newPulsarClient(lookupUrl.toString(), 30); + transaction = mock(TransactionImpl.class); + doReturn(1L).when(transaction).getTxnIdLeastBits(); + doReturn(1L).when(transaction).getTxnIdMostBits(); + doReturn(TransactionImpl.State.OPEN).when(transaction).getState(); + CompletableFuture completableFuture = CompletableFuture.completedFuture(null); + doNothing().when(transaction).registerAckOp(any()); + doReturn(true).when(transaction).checkIfOpen(any()); + doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any()); + } + + @AfterClass(alwaysRun = true) + public void cleanup() throws Exception { + this.clientWithStats.close(); + super.internalCleanup(); + } + + @Test + public void testAckResponse() throws PulsarClientException, InterruptedException { + String topic = "testAckResponse"; + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + @Cleanup + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + producer.send(1); + producer.send(2); + try { + consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), transaction).get(); + fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException); + } + Message message = consumer.receive(); + + try { + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException); + } + } + + @Test + public void testIndividualAck() throws Exception { + @Cleanup AckTestData data = prepareDataForAck("test-individual-ack"); + for (MessageId messageId : data.messageIds) { + data.consumer.acknowledge(messageId); + } + assertEquals(data.interceptor.individualAckedMessageIdList, data.messageIds); + assertEquals(data.consumer.getStats().getNumAcksSent(), data.size()); + assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty()); + } + + @Test + public void testIndividualAckList() throws Exception { + @Cleanup AckTestData data = prepareDataForAck("test-individual-ack-list"); + data.consumer.acknowledge(data.messageIds); + assertEquals(data.interceptor.individualAckedMessageIdList, data.messageIds); + assertEquals(data.consumer.getStats().getNumAcksSent(), data.size()); + assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty()); + } + + @Test + public void testCumulativeAck() throws Exception { + @Cleanup AckTestData data = prepareDataForAck("test-cumulative-ack"); + System.out.println(data.size()); + data.consumer.acknowledgeCumulative(data.messageIds.get(data.size() - 1)); + assertEquals(data.interceptor.cumulativeAckedMessageIdList.get(0), + data.messageIds.get(data.messageIds.size() - 1)); + assertEquals(data.consumer.getStats().getNumAcksSent(), 2); + assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty()); + } + + // Send 1 non-batched message, then send N-1 messages that are in the same batch + private AckTestData prepareDataForAck(String topic) throws PulsarClientException { + final int numMessages = 10; + @Cleanup Producer batchProducer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(true) + .batchingMaxMessages(numMessages - 1) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .create(); + @Cleanup Producer nonBatchProducer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create(); + AckStatsInterceptor interceptor = new AckStatsInterceptor(); + ConsumerImpl consumer = (ConsumerImpl) clientWithStats.newConsumer(Schema.STRING).topic(topic) + .subscriptionName("sub").intercept(interceptor).ackTimeout(10, TimeUnit.SECONDS).subscribe(); + + nonBatchProducer.send("msg-0"); + for (int i = 1; i < numMessages; i++) { + batchProducer.sendAsync("msg-" + i); + } + List messageIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Message message = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(message); + messageIds.add(message.getMessageId()); + } + MessageId firstEntryMessageId = messageIds.get(0); + MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl(); + // Verify messages 2 to N must be in the same entry + for (int i = 2; i < messageIds.size(); i++) { + assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId); + } + + assertTrue(interceptor.individualAckedMessageIdList.isEmpty()); + assertTrue(interceptor.cumulativeAckedMessageIdList.isEmpty()); + assertEquals(consumer.getStats().getNumAcksSent(), 0); + assertNotNull(consumer.getUnAckedMessageTracker().messageIdPartitionMap); + assertEquals(consumer.getUnAckedMessageTracker().messageIdPartitionMap.keySet(), + Sets.newHashSet(firstEntryMessageId, secondEntryMessageId)); + return new AckTestData(consumer, interceptor, messageIds); + } + + // Send 10 messages, the 1st message is a non-batched message, the other messages are in the same batch + @AllArgsConstructor + private static class AckTestData implements Closeable { + + private final ConsumerImpl consumer; + private final AckStatsInterceptor interceptor; + private final List messageIds; + + public int size() { + return messageIds.size(); + } + + @Override + public void close() throws IOException { + interceptor.close(); + consumer.close(); + } + } + + private static class AckStatsInterceptor implements ConsumerInterceptor { + + private final List individualAckedMessageIdList = new CopyOnWriteArrayList<>(); + private final List cumulativeAckedMessageIdList = new CopyOnWriteArrayList<>(); + + @Override + public void close() { + // No ops + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + if (exception != null) { + log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId); + return; + } + individualAckedMessageIdList.add(messageId); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable exception) { + if (exception != null) { + log.error("[{}] Failed to acknowledge {}", consumer.getConsumerName(), messageId); + return; + } + cumulativeAckedMessageIdList.add(messageId); + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + // No ops + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + // No ops + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 104d36b4b2fae..ee25d504cf9fe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -147,6 +147,12 @@ public MessageIdImpl prevBatchMessageId() { ledgerId, entryId - 1, partitionIndex); } + // MessageIdImpl is widely used as the key of a hash map, in this case, we should convert the batch message id to + // have the correct hash code. + public MessageIdImpl toMessageIdImpl() { + return new MessageIdImpl(ledgerId, entryId, partitionIndex); + } + public BatchMessageAcker getAcker() { return acker; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 9829babece74b..ba944b2d83a29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -35,6 +35,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import javax.annotation.Nullable; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; @@ -162,18 +164,20 @@ public CompletableFuture addListAcknowledgment(List messageIds, private void addListAcknowledgment(List messageIds) { for (MessageId messageId : messageIds) { - consumer.onAcknowledge(messageId, null); if (messageId instanceof BatchMessageIdImpl) { BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - if (!batchMessageId.ackIndividual()) { - doIndividualBatchAckAsync((BatchMessageIdImpl) messageId); - } else { - messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId); - doIndividualAckAsync((MessageIdImpl) messageId); - } + addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(), + batchMessageId, + this::doIndividualAckAsync, + this::doIndividualBatchAckAsync); + } else if (messageId instanceof MessageIdImpl) { + addIndividualAcknowledgment((MessageIdImpl) messageId, + null, + this::doIndividualAckAsync, + this::doIndividualBatchAckAsync); } else { - modifyMessageIdStatesInConsumer((MessageIdImpl) messageId); - doIndividualAckAsync((MessageIdImpl) messageId); + throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: " + + messageId.getClass().getCanonicalName()); } } } @@ -183,67 +187,65 @@ public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ac Map properties) { if (msgId instanceof BatchMessageIdImpl) { BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId; - if (ackType == AckType.Individual) { - consumer.onAcknowledge(msgId, null); - // ack this ack carry bitSet index and judge bit set are all ack - if (batchMessageId.ackIndividual()) { - MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId); - return doIndividualAck(messageId, properties); - } else if (batchIndexAckEnabled){ - return doIndividualBatchAck(batchMessageId, properties); - } else { - // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are - // all ack complete - return CompletableFuture.completedFuture(null); - } - } else { - consumer.onAcknowledgeCumulative(msgId, null); - if (batchMessageId.ackCumulative()) { - return doCumulativeAck(msgId, properties, null); - } else { - if (batchIndexAckEnabled) { - return doCumulativeBatchIndexAck(batchMessageId, properties); - } else { - // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can - // ack - if (AckType.Cumulative == ackType - && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { - doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null); - batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); - } - return CompletableFuture.completedFuture(null); - } - } - } + return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId); } else { - if (ackType == AckType.Individual) { - consumer.onAcknowledge(msgId, null); - modifyMessageIdStatesInConsumer(msgId); - return doIndividualAck(msgId, properties); - } else { - consumer.onAcknowledgeCumulative(msgId, null); - return doCumulativeAck(msgId, properties, null); - } + return addAcknowledgment(msgId, ackType, properties, null); } } - private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) { - MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(), - batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()); - consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize()); - clearMessageIdFromUnAckTrackerAndDeadLetter(messageId); - return messageId; - } - - private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) { - consumer.getStats().incrementNumAcksSent(1); - clearMessageIdFromUnAckTrackerAndDeadLetter(messageId); + private CompletableFuture addIndividualAcknowledgment( + MessageIdImpl msgId, + @Nullable BatchMessageIdImpl batchMessageId, + Function> individualAckFunction, + Function> batchAckFunction) { + if (batchMessageId != null) { + consumer.onAcknowledge(batchMessageId, null); + } else { + consumer.onAcknowledge(msgId, null); + } + if (batchMessageId == null || batchMessageId.ackIndividual()) { + consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1); + consumer.getUnAckedMessageTracker().remove(msgId); + if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { + consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId); + } + return individualAckFunction.apply(msgId); + } else if (batchIndexAckEnabled) { + return batchAckFunction.apply(batchMessageId); + } else { + return CompletableFuture.completedFuture(null); + } } - private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl messageId) { - consumer.getUnAckedMessageTracker().remove(messageId); - if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { - consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId); + private CompletableFuture addAcknowledgment(MessageIdImpl msgId, + AckType ackType, + Map properties, + @Nullable BatchMessageIdImpl batchMessageId) { + switch (ackType) { + case Individual: + return addIndividualAcknowledgment(msgId, + batchMessageId, + __ -> doIndividualAck(__, properties), + __ -> doIndividualBatchAck(__, properties)); + case Cumulative: + if (batchMessageId != null) { + consumer.onAcknowledgeCumulative(batchMessageId, null); + } else { + consumer.onAcknowledgeCumulative(msgId, null); + } + if (batchMessageId == null || batchMessageId.ackCumulative()) { + return doCumulativeAck(msgId, properties, null); + } else if (batchIndexAckEnabled) { + return doCumulativeBatchIndexAck(batchMessageId, properties); + } else { + if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { + doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null); + batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); + } + return CompletableFuture.completedFuture(null); + } + default: + throw new IllegalStateException("Unknown AckType: " + ackType); } } @@ -278,9 +280,10 @@ private CompletableFuture doIndividualAck(MessageIdImpl messageId, Map doIndividualAckAsync(MessageIdImpl messageId) { pendingIndividualAcks.add(messageId); pendingIndividualBatchIndexAcks.remove(messageId); + return CompletableFuture.completedFuture(null); } private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMessageId, @@ -343,10 +346,9 @@ private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( - new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), - batchMessageId.getPartitionIndex()), (v) -> { + batchMessageId.toMessageIdImpl(), __ -> { ConcurrentBitSetRecyclable value; if (batchMessageId.getAcker() != null && !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) { @@ -358,6 +360,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { return value; }); bitSet.clear(batchMessageId.getBatchIndex()); + return CompletableFuture.completedFuture(null); } private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {