From 1eb278ddd04857977137e986048f1057fa1e0832 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 22 Dec 2022 23:10:45 +0800 Subject: [PATCH] [fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure Fixes https://github.com/apache/pulsar/issues/19030 ### Motivation When a `BatchMessageIdImpl` is created from a deserialization, the `BatchMessageAcker` object cannot be shared by all instances in the same batch, which leads to an acknowledgment failure when batch index ACK is disabled (by default). ### Modifications Maintain a map from the `(ledger id, entry id)` pair to the `BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl` doesn't carry a valid `BatchMessageAcker`, create and cache a `BatchMessageAcker` instance and remove it when all messages in the batch are acknowledged. It requires a change in `MessageIdImpl#fromByteArray` that a `BatchMessageAckerDisabled` will be created to indicate there is no shared acker. To avoid making code more complicated, this patch refactors the existing code that many logics about consumer are moved from the ACK tracker to the consumer. It also removes the `AckType` parameter when acknowledging a list of messages. --- .../api/MessageIdSerializationTest.java | 105 ++++++++++++++ .../impl/AcknowledgmentsGroupingTracker.java | 8 +- .../pulsar/client/impl/ConsumerBase.java | 10 +- .../pulsar/client/impl/ConsumerImpl.java | 96 ++++++++++++- .../pulsar/client/impl/MessageIdImpl.java | 11 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- ...rsistentAcknowledgmentGroupingTracker.java | 3 +- ...sistentAcknowledgmentsGroupingTracker.java | 134 ++++-------------- .../AcknowledgementsGroupingTrackerTest.java | 80 +++-------- 9 files changed, 262 insertions(+), 187 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java new file mode 100644 index 0000000000000..ae68a520044e6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageIdSerializationTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Cleanup; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class MessageIdSerializationTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testSerialization() throws Exception { + String topic = "test-serialization-origin"; + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(100) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .create(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + + final int numMessages = 10; + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(i); + } + producer.flush(); + final List msgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + msgIds.add(consumer.receive().getMessageId()); + } + final AtomicLong ledgerId = new AtomicLong(-1L); + final AtomicLong entryId = new AtomicLong(-1L); + for (int i = 0; i < numMessages; i++) { + assertTrue(msgIds.get(i) instanceof BatchMessageIdImpl); + final BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgIds.get(i); + ledgerId.compareAndSet(-1L, batchMessageId.getLedgerId()); + assertEquals(batchMessageId.getLedgerId(), ledgerId.get()); + entryId.compareAndSet(-1L, batchMessageId.getEntryId()); + assertEquals(batchMessageId.getEntryId(), entryId.get()); + assertEquals(batchMessageId.getBatchSize(), numMessages); + } + + final List deserializedMsgIds = new ArrayList<>(); + for (MessageId msgId : msgIds) { + MessageId deserialized = MessageId.fromByteArray(msgId.toByteArray()); + assertTrue(deserialized instanceof BatchMessageIdImpl); + deserializedMsgIds.add(deserialized); + } + for (MessageId msgId : deserializedMsgIds) { + consumer.acknowledge(msgId); + } + consumer.close(); + + consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + MessageId newMsgId = producer.send(0); + MessageId receivedMessageId = consumer.receive().getMessageId(); + assertEquals(newMsgId, receivedMessageId); + consumer.close(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index d46af1a99e7f0..d9e2f3e4c7bcd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -33,8 +33,12 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); - CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, - Map properties); + default CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType, + Map properties) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture addListAcknowledgment(List messageIds, Map properties); void flush(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dd4932ec2bca4..ef514f4389f99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -542,12 +542,12 @@ public CompletableFuture acknowledgeAsync(Messages messages, Transactio @Override public CompletableFuture acknowledgeAsync(List messageIdList) { - return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null); + return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), null); } @Override public CompletableFuture acknowledgeAsync(List messageIdList, Transaction txn) { - return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn); + return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), (TransactionImpl) txn); } @Override @@ -655,17 +655,17 @@ public void negativeAcknowledge(Message message) { negativeAcknowledge(message.getMessageId()); } - protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, AckType ackType, + protected CompletableFuture doAcknowledgeWithTxn(List messageIdList, Map properties, TransactionImpl txn) { CompletableFuture ackFuture; if (txn != null && this instanceof ConsumerImpl) { ackFuture = txn.registerAckedTopic(getTopic(), subscription) - .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn)); + .thenCompose(ignored -> doAcknowledge(messageIdList, AckType.Individual, properties, txn)); // register the ackFuture as part of the transaction txn.registerAckOp(ackFuture); } else { - ackFuture = doAcknowledge(messageIdList, ackType, properties, txn); + ackFuture = doAcknowledge(messageIdList, AckType.Individual, properties, txn); } return ackFuture; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2680e70703022..ac79550aa317f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -62,9 +62,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; @@ -204,6 +206,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); + // Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged + private final Map, BatchMessageAcker> batchMessageToAcker = new ConcurrentHashMap<>(); + static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -529,6 +534,51 @@ protected CompletableFuture> internalBatchReceiveAsync() { return result; } + private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) { + if (ackType == AckType.Individual) { + stats.incrementNumAcksSent(numMessages); + unAckedMessageTracker.remove(messageId); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(messageId); + } + } else { + stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId)); + } + } + + private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) { + final BatchMessageAcker acker; + if (messageId.getAcker() instanceof BatchMessageAckerDisabled) { + acker = batchMessageToAcker.computeIfAbsent( + Pair.of(messageId.getLedgerId(), messageId.getEntryId()), + __ -> BatchMessageAcker.newAcker(messageId.getOriginalBatchSize())); + } else { + acker = messageId.getAcker(); + } + if (ackType == AckType.Individual) { + if (acker.ackIndividual(messageId.getBatchIndex())) { + batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + return messageId.toMessageIdImpl(); + } else { + return conf.isBatchIndexAckEnabled() ? messageId : null; + } + } else { + if (acker.ackCumulative(messageId.getBatchIndex())) { + batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId())); + return messageId.toMessageIdImpl(); + } else if (conf.isBatchIndexAckEnabled()) { + return messageId; + } else { + if (acker.isPrevBatchCumulativelyAcked()) { + return null; + } else { + acker.setPrevBatchCumulativelyAcked(true); + return messageId.prevBatchMessageId(); + } + } + } + } + @Override protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, @@ -549,13 +599,34 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } - return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties); + if (ackType == AckType.Individual) { + onAcknowledge(messageId, null); + } else { + onAcknowledgeCumulative(messageId, null); + } + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); + if (messageIdImpl == null) { + return CompletableFuture.completedFuture(null); + } else if (messageIdImpl instanceof BatchMessageIdImpl) { + return acknowledgmentsGroupingTracker.addBatchIndexAck( + (BatchMessageIdImpl) messageIdImpl, ackType, properties); + } else { + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getOriginalBatchSize()); + return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); + } + } else { + MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1); + return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties); + } } @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { - + List messageIdListToAck = new ArrayList<>(); for (MessageId messageId : messageIdList) { checkArgument(messageId instanceof MessageIdImpl); } @@ -573,7 +644,26 @@ protected CompletableFuture doAcknowledge(List messageIdList, A return doTransactionAcknowledgeForResponse(messageIdList, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } else { - return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties); + for (MessageId messageId : messageIdList) { + checkArgument(messageId instanceof MessageIdImpl); + onAcknowledge(messageId, null); + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType); + if (messageIdImpl != null) { + if (!(messageIdImpl instanceof BatchMessageIdImpl)) { + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, + batchMessageId.getOriginalBatchSize()); + } // else: batch index ACK + messageIdListToAck.add(messageIdImpl); + } + } else { + MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; + processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1); + messageIdListToAck.add(messageIdImpl); + } + } + return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdListToAck, properties); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 02298e0f9d66d..1ffb5fb12385d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -95,14 +95,9 @@ public static MessageId fromByteArray(byte[] data) throws IOException { } MessageIdImpl messageId; - if (idData.hasBatchIndex()) { - if (idData.hasBatchSize()) { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize())); - } else { - messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), - idData.getBatchIndex()); - } + if (idData.hasBatchIndex() && idData.hasBatchSize()) { + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAckerDisabled.INSTANCE); } else if (idData.hasFirstChunkMessageId()) { MessageIdData firstChunkIdData = idData.getFirstChunkMessageId(); messageId = new ChunkMessageIdImpl( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 224276ba5f08f..2105fec21589f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -493,7 +493,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, } topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); - resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn) + resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, properties, txn) .thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove))); }); return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 32f8fb922304b..6faf95b5b6b36 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -50,8 +50,7 @@ public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ac } @Override - public CompletableFuture addListAcknowledgment(List messageIds, - AckType ackType, + public CompletableFuture addListAcknowledgment(List messageIds, Map properties) { // no-op return CompletableFuture.completedFuture(null); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index fef0bcb8906f1..efd77e8a18221 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -18,18 +18,17 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -37,8 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; -import javax.annotation.Nullable; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; @@ -83,7 +80,6 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments private final ConcurrentHashMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; - private final boolean batchIndexAckEnabled; private final boolean ackReceiptEnabled; public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, ConsumerConfigurationData conf, @@ -93,7 +89,6 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>(); this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros(); this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize(); - this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled(); this.ackReceiptEnabled = conf.isAckReceiptEnabled(); this.currentIndividualAckFuture = new TimedCompletableFuture<>(); this.currentCumulativeAckFuture = new TimedCompletableFuture<>(); @@ -129,53 +124,31 @@ public boolean isDuplicate(MessageId messageId) { } @Override - public CompletableFuture addListAcknowledgment(List messageIds, - AckType ackType, Map properties) { - if (AckType.Cumulative.equals(ackType)) { - if (consumer.isAckReceiptEnabled()) { - Set> completableFutureSet = new HashSet<>(); - messageIds.forEach(messageId -> - completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties))); - return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet)); + public CompletableFuture addListAcknowledgment(List messageIds, + Map properties) { + Optional readLock = acquireReadLock(); + try { + if (messageIds.size() != 0) { + addListAcknowledgment(messageIds); + return readLock.map(__ -> currentIndividualAckFuture) + .orElse(CompletableFuture.completedFuture(null)); } else { - messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties)); return CompletableFuture.completedFuture(null); } - } else { - Optional readLock = acquireReadLock(); - try { - if (messageIds.size() != 0) { - addListAcknowledgment(messageIds); - return readLock.map(__ -> currentIndividualAckFuture) - .orElse(CompletableFuture.completedFuture(null)); - } else { - return CompletableFuture.completedFuture(null); - } - } finally { - readLock.ifPresent(Lock::unlock); - if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { - flush(); - } + } finally { + readLock.ifPresent(Lock::unlock); + if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { + flush(); } } } - private void addListAcknowledgment(List messageIds) { - for (MessageId messageId : messageIds) { + private void addListAcknowledgment(List messageIds) { + for (MessageIdImpl messageId : messageIds) { if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(), - batchMessageId, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); - } else if (messageId instanceof MessageIdImpl) { - addIndividualAcknowledgment((MessageIdImpl) messageId, - null, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); + doIndividualBatchAckAsync((BatchMessageIdImpl) messageId); } else { - throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: " - + messageId.getClass().getCanonicalName()); + doIndividualAckAsync(messageId); } } } @@ -183,67 +156,21 @@ private void addListAcknowledgment(List messageIds) { @Override public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { - if (msgId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId; - return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId); + checkArgument(!(msgId instanceof BatchMessageIdImpl)); + if (ackType == AckType.Individual) { + return doIndividualAck(msgId, properties); } else { - return addAcknowledgment(msgId, ackType, properties, null); + return doCumulativeAck(msgId, properties, null); } } - private CompletableFuture addIndividualAcknowledgment( - MessageIdImpl msgId, - @Nullable BatchMessageIdImpl batchMessageId, - Function> individualAckFunction, - Function> batchAckFunction) { - if (batchMessageId != null) { - consumer.onAcknowledge(batchMessageId, null); - } else { - consumer.onAcknowledge(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackIndividual()) { - consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1); - consumer.getUnAckedMessageTracker().remove(msgId); - if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { - consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId); - } - return individualAckFunction.apply(msgId); - } else if (batchIndexAckEnabled) { - return batchAckFunction.apply(batchMessageId); + @Override + public CompletableFuture addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType, + Map properties) { + if (ackType == AckType.Individual) { + return doIndividualBatchAck(msgId, properties); } else { - return CompletableFuture.completedFuture(null); - } - } - - private CompletableFuture addAcknowledgment(MessageIdImpl msgId, - AckType ackType, - Map properties, - @Nullable BatchMessageIdImpl batchMessageId) { - switch (ackType) { - case Individual: - return addIndividualAcknowledgment(msgId, - batchMessageId, - __ -> doIndividualAck(__, properties), - __ -> doIndividualBatchAck(__, properties)); - case Cumulative: - if (batchMessageId != null) { - consumer.onAcknowledgeCumulative(batchMessageId, null); - } else { - consumer.onAcknowledgeCumulative(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackCumulative()) { - return doCumulativeAck(msgId, properties, null); - } else if (batchIndexAckEnabled) { - return doCumulativeBatchIndexAck(batchMessageId, properties); - } else { - if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { - doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null); - batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); - } - return CompletableFuture.completedFuture(null); - } - default: - throw new IllegalStateException("Unknown AckType: " + ackType); + return doCumulativeBatchIndexAck(msgId, properties); } } @@ -267,10 +194,9 @@ private CompletableFuture doIndividualAck(MessageIdImpl messageId, Map doIndividualAckAsync(MessageIdImpl messageId) { + private void doIndividualAckAsync(MessageIdImpl messageId) { pendingIndividualAcks.add(messageId); pendingIndividualBatchIndexAcks.remove(messageId); - return CompletableFuture.completedFuture(null); } private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMessageId, @@ -298,7 +224,6 @@ private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMes private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map properties, BitSetRecyclable bitSet) { - consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an // uncommon condition since it's only used for the compaction subscription. @@ -314,7 +239,7 @@ private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { + private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( batchMessageId.toMessageIdImpl(), __ -> { ConcurrentBitSetRecyclable value; @@ -328,7 +253,6 @@ private CompletableFuture doIndividualBatchAckAsync(BatchMessageIdImpl bat return value; }); bitSet.clear(batchMessageId.getBatchIndex()); - return CompletableFuture.completedFuture(null); } private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index ddca6951e49e1..51e3a5099cc11 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -154,51 +154,30 @@ public void testBatchAckTracker(boolean isNeedReceipt) throws Exception { MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); - MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); - MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); - MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); - // Flush while disconnected. the internal tracking will not change tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg6)); + tracker.addListAcknowledgment(Collections.singletonList(msg3), Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg3)); when(consumer.getClientCnx()).thenReturn(cnx); tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); + assertFalse(tracker.isDuplicate(msg3)); tracker.close(); } @@ -247,7 +226,7 @@ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws Except when(consumer.getClientCnx()).thenReturn(null); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); when(consumer.getClientCnx()).thenReturn(cnx); @@ -255,7 +234,7 @@ public void testImmediateBatchAckingTracker(boolean isNeedReceipt) throws Except tracker.flush(); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg2), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg2), Collections.emptyMap()); tracker.flush(); // Since we were connected, the ack went out immediately @@ -337,52 +316,31 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception MessageIdImpl msg1 = new MessageIdImpl(5, 1, 0); MessageIdImpl msg2 = new MessageIdImpl(5, 2, 0); - MessageIdImpl msg3 = new MessageIdImpl(5, 3, 0); - MessageIdImpl msg4 = new MessageIdImpl(5, 4, 0); - MessageIdImpl msg5 = new MessageIdImpl(5, 5, 0); - MessageIdImpl msg6 = new MessageIdImpl(5, 6, 0); + MessageIdImpl msg3 = new MessageIdImpl(5, 6, 0); assertFalse(tracker.isDuplicate(msg1)); - tracker.addListAcknowledgment(Collections.singletonList(msg1), AckType.Individual, Collections.emptyMap()); + tracker.addListAcknowledgment(Collections.singletonList(msg1), Collections.emptyMap()); assertTrue(tracker.isDuplicate(msg1)); assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg5), AckType.Cumulative, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); - // Flush while disconnected. the internal tracking will not change tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); - tracker.addListAcknowledgment(Collections.singletonList(msg6), AckType.Individual, Collections.emptyMap()); - assertTrue(tracker.isDuplicate(msg6)); + tracker.addListAcknowledgment(Collections.singletonList(msg3), Collections.emptyMap()); + assertTrue(tracker.isDuplicate(msg3)); when(consumer.getClientCnx()).thenReturn(cnx); tracker.flush(); - assertTrue(tracker.isDuplicate(msg1)); - assertTrue(tracker.isDuplicate(msg2)); - assertTrue(tracker.isDuplicate(msg3)); - - assertTrue(tracker.isDuplicate(msg4)); - assertTrue(tracker.isDuplicate(msg5)); - assertFalse(tracker.isDuplicate(msg6)); + assertFalse(tracker.isDuplicate(msg1)); + assertFalse(tracker.isDuplicate(msg2)); + assertFalse(tracker.isDuplicate(msg3)); tracker.close(); }