From 8cfb1ac2f4b98b8178910f4d6c4b4c1adf85db42 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 22 Mar 2018 19:50:50 -0700 Subject: [PATCH 1/4] Improve batch message acking by removing batch message tracker --- .../broker/service/BatchMessageTest.java | 3 - .../pulsar/client/impl/BatchMessageAcker.java | 64 +++++++++ .../impl/BatchMessageAckerDisabled.java | 43 ++++++ .../client/impl/BatchMessageIdImpl.java | 34 ++++- .../pulsar/client/impl/ConsumerImpl.java | 133 ++---------------- .../client/impl/PartitionedConsumerImpl.java | 9 -- .../client/impl/TopicsConsumerImpl.java | 10 -- .../impl/BatchMessageAckerDisabledTest.java | 47 +++++++ .../client/impl/BatchMessageAckerTest.java | 71 ++++++++++ 9 files changed, 269 insertions(+), 145 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 2d7c7aea4b76c..baa814f68de33 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -42,7 +42,6 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -514,7 +513,6 @@ public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception { Thread.sleep(100); rolloverPerIntervalStats(); assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); - assertTrue(((ConsumerImpl) consumer).isBatchingAckTrackerEmpty()); consumer.close(); producer.close(); noBatchProducer.close(); @@ -574,7 +572,6 @@ public void testBatchAndNonBatchCumulativeAcks() throws Exception { } Thread.sleep(100); assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0); - assertTrue(((ConsumerImpl) consumer).isBatchingAckTrackerEmpty()); consumer.close(); producer.close(); noBatchProducer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java new file mode 100644 index 0000000000000..fb8e6d98d982a --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import com.google.common.annotations.VisibleForTesting; +import java.util.BitSet; + +class BatchMessageAcker { + + static BatchMessageAcker newAcker(int batchSize) { + BitSet bitSet = new BitSet(batchSize); + bitSet.set(0, batchSize); + return new BatchMessageAcker(bitSet); + } + + // bitset shared across messages in the same batch. + private final BitSet bitSet; + + BatchMessageAcker(BitSet bitSet) { + this.bitSet = bitSet; + } + + @VisibleForTesting + BitSet getBitSet() { + return bitSet; + } + + public synchronized int getBatchSize() { + return bitSet.length(); + } + + public synchronized boolean ackIndividual(int batchIndex) { + bitSet.clear(batchIndex); + return bitSet.isEmpty(); + } + + public synchronized boolean ackCumulative(int batchIndex) { + // +1 since to argument is exclusive + bitSet.clear(0, batchIndex + 1); + return bitSet.isEmpty(); + } + + // debug purpose + public synchronized int getOutstandingAcks() { + return bitSet.cardinality(); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java new file mode 100644 index 0000000000000..245e83057ef26 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +class BatchMessageAckerDisabled extends BatchMessageAcker { + + static final BatchMessageAckerDisabled INSTANCE = new BatchMessageAckerDisabled(); + + private BatchMessageAckerDisabled() { + super(null); + } + + @Override + public boolean ackIndividual(int batchIndex) { + return true; + } + + @Override + public boolean ackCumulative(int batchIndex) { + return true; + } + + @Override + public int getOutstandingAcks() { + return 0; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index f7c5bddb5156d..9381d3571fe6e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -27,17 +27,27 @@ public class BatchMessageIdImpl extends MessageIdImpl { private final static int NO_BATCH = -1; private final int batchIndex; + private final BatchMessageAcker acker; + public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) { + this(ledgerId, entryId, partitionIndex, batchIndex, BatchMessageAckerDisabled.INSTANCE); + } + + public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, BatchMessageAcker acker) { super(ledgerId, entryId, partitionIndex); this.batchIndex = batchIndex; + this.acker = acker; } public BatchMessageIdImpl(MessageIdImpl other) { super(other.ledgerId, other.entryId, other.partitionIndex); if (other instanceof BatchMessageIdImpl) { - this.batchIndex = ((BatchMessageIdImpl) other).batchIndex; + BatchMessageIdImpl otherId = (BatchMessageIdImpl) other; + this.batchIndex = otherId.batchIndex; + this.acker = otherId.acker; } else { this.batchIndex = NO_BATCH; + this.acker = BatchMessageAckerDisabled.INSTANCE; } } @@ -95,4 +105,26 @@ public String toString() { public byte[] toByteArray() { return toByteArray(batchIndex); } + + public boolean ackIndividual() { + return acker.ackIndividual(batchIndex); + } + + public boolean ackCumulative() { + return acker.ackCumulative(batchIndex); + } + + public int getOutstandingAcksInSameBatch() { + return acker.getOutstandingAcks(); + } + + public int getBatchSize() { + return acker.getBatchSize(); + } + + public MessageIdImpl prevBatchMessageId() { + return new MessageIdImpl( + ledgerId, entryId - 1, partitionIndex); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 067a677707355..cea7181c5f0f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -27,17 +27,13 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -109,7 +105,6 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final ReadWriteLock zeroQueueLock; private final UnAckedMessageTracker unAckedMessageTracker; - private final ConcurrentNavigableMap batchMessageAckTracker; protected final ConsumerStatsRecorder stats; private final int priorityLevel; @@ -154,7 +149,6 @@ enum SubscriptionMode { this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2; this.codecProvider = new CompressionCodecProvider(); this.priorityLevel = conf.getPriorityLevel(); - this.batchMessageAckTracker = new ConcurrentSkipListMap<>(); this.readCompacted = conf.isReadCompacted(); this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition(); @@ -218,7 +212,6 @@ public CompletableFuture unsubscribeAsync() { cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> { cnx.removeConsumer(consumerId); log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription); - batchMessageAckTracker.clear(); unAckedMessageTracker.close(); unsubscribeFuture.complete(null); setState(State.Closed); @@ -354,75 +347,23 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarCl } } - // we may not be able to ack message being acked by client. However messages in prior - // batch may be ackable - private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message, - Map properties) { - // get entry before this message and ack that message on broker - MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message); - if (lowerKey != null) { - NavigableMap entriesUpto = batchMessageAckTracker.headMap(lowerKey, true); - for (Object key : entriesUpto.keySet()) { - entriesUpto.remove(key); - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] ack prior message {} to broker on cumulative ack for message {}", subscription, - consumerId, lowerKey, batchMessageId); - } - sendAcknowledge(lowerKey, AckType.Cumulative, properties); - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] no messages prior to message {}", subscription, consumerId, batchMessageId); - } - } - } - boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackType, Map properties) { - // we keep track of entire batch and so need MessageIdImpl and cannot use BatchMessageIdImpl - MessageIdImpl message = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), - batchMessageId.getPartitionIndex()); - BitSet bitSet = batchMessageAckTracker.get(message); - if (bitSet == null) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] message not found {} for ack {}", subscription, consumerId, batchMessageId, - ackType); - } - return true; - } - int batchIndex = batchMessageId.getBatchIndex(); - // bitset is not thread-safe and requires external synchronization - int batchSize = 0; - // only used for debug-logging - int outstandingAcks = 0; - boolean isAllMsgsAcked = false; - lock.writeLock().lock(); - try { - batchSize = bitSet.length(); - if (ackType == AckType.Individual) { - bitSet.clear(batchIndex); - } else { - // +1 since to argument is exclusive - bitSet.clear(0, batchIndex + 1); - } - isAllMsgsAcked = bitSet.isEmpty(); - if (log.isDebugEnabled()) { - outstandingAcks = bitSet.cardinality(); - } - } finally { - lock.writeLock().unlock(); + boolean isAllMsgsAcked; + if (ackType == AckType.Individual) { + isAllMsgsAcked = batchMessageId.ackIndividual(); + } else { + isAllMsgsAcked = batchMessageId.ackCumulative(); } + int outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch(); + int batchSize = batchMessageId.getBatchSize(); // all messages in this batch have been acked if (isAllMsgsAcked) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] can ack message to broker {}, acktype {}, cardinality {}, length {}", subscription, consumerName, batchMessageId, ackType, outstandingAcks, batchSize); } - if (ackType == AckType.Cumulative) { - batchMessageAckTracker.keySet().removeIf(m -> (m.compareTo(message) <= 0)); - } - batchMessageAckTracker.remove(message); // increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual. // CumulativeAckType is handled while sending ack to broker if (ackType == AckType.Individual) { @@ -430,9 +371,8 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp } return true; } else { - // we cannot ack this message to broker. but prior message may be ackable - if (ackType == AckType.Cumulative) { - ackMessagesInEarlierBatch(batchMessageId, message, properties); + if (outstandingAcks == batchSize - 1) { + sendAcknowledge(batchMessageId.prevBatchMessageId(), AckType.Cumulative, properties); } if (log.isDebugEnabled()) { log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", subscription, @@ -442,38 +382,6 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp return false; } - // if we are consuming a mix of batch and non-batch messages then cumulative ack on non-batch messages - // should clean up the ack tracker as well - private void updateBatchAckTracker(MessageIdImpl message, AckType ackType) { - if (batchMessageAckTracker.isEmpty()) { - return; - } - MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message); - if (lowerKey != null) { - NavigableMap entriesUpto = batchMessageAckTracker.headMap(lowerKey, true); - for (Object key : entriesUpto.keySet()) { - entriesUpto.remove(key); - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] updated batch ack tracker up to message {} on cumulative ack for message {}", - subscription, consumerId, lowerKey, message); - } - } else { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] no messages to clean up prior to message {}", subscription, consumerId, message); - } - } - } - - /** - * helper method that returns current state of data structure used to track acks for batch messages - * - * @return true if all batch messages have been acknowledged - */ - public boolean isBatchingAckTrackerEmpty() { - return batchMessageAckTracker.isEmpty(); - } - @Override protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties) { @@ -495,11 +403,6 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return CompletableFuture.completedFuture(null); } } - // if we got a cumulative ack on non batch message, check if any earlier batch messages need to be removed - // from batch message tracker - if (ackType == AckType.Cumulative && !(messageId instanceof BatchMessageIdImpl)) { - updateBatchAckTracker((MessageIdImpl) messageId, ackType); - } return sendAcknowledge(messageId, ackType, properties); } @@ -559,7 +462,6 @@ public void connectionOpened(final ClientCnx cnx) { currentSize = incomingMessages.size(); startMessageId = clearReceiverQueue(); unAckedMessageTracker.clear(); - batchMessageAckTracker.clear(); } boolean isDurable = subscriptionMode == SubscriptionMode.Durable; @@ -701,7 +603,6 @@ public void connectionFailed(PulsarClientException exception) { @Override public CompletableFuture closeAsync() { if (getState() == State.Closing || getState() == State.Closed) { - batchMessageAckTracker.clear(); unAckedMessageTracker.close(); return CompletableFuture.completedFuture(null); } @@ -709,7 +610,6 @@ public CompletableFuture closeAsync() { if (!isConnected()) { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); setState(State.Closed); - batchMessageAckTracker.clear(); unAckedMessageTracker.close(); client.cleanupConsumer(this); return CompletableFuture.completedFuture(null); @@ -729,7 +629,6 @@ public CompletableFuture closeAsync() { if (exception == null || !cnx.ctx().channel().isActive()) { log.info("[{}] [{}] Closed consumer", topic, subscription); setState(State.Closed); - batchMessageAckTracker.clear(); unAckedMessageTracker.close(); closeFuture.complete(null); client.cleanupConsumer(this); @@ -924,15 +823,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc int batchSize = msgMetadata.getNumMessagesInBatch(); // create ack tracker for entry aka batch - BitSet bitSet = new BitSet(batchSize); MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - bitSet.set(0, batchSize); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] added bit set for message {}, cardinality {}, length {}", subscription, consumerName, - batchMessage, bitSet.cardinality(), bitSet.length()); - } - batchMessageAckTracker.put(batchMessage, bitSet); + BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); unAckedMessageTracker.add(batchMessage); int skippedMessages = 0; @@ -970,7 +863,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc } BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), - messageId.getEntryId(), getPartitionIndex(), i); + messageId.getEntryId(), getPartitionIndex(), i, acker); final MessageImpl message = new MessageImpl<>(batchMessageIdImpl, msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload, cnx, schema); lock.readLock().lock(); @@ -987,9 +880,7 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc singleMessageMetadataBuilder.recycle(); } } catch (IOException e) { - // log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); - batchMessageAckTracker.remove(batchMessage); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); } if (log.isDebugEnabled()) { @@ -1191,7 +1082,6 @@ public void redeliverUnacknowledgedMessages() { currentSize = incomingMessages.size(); incomingMessages.clear(); unAckedMessageTracker.clear(); - batchMessageAckTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); if (currentSize > 0) { @@ -1231,7 +1121,6 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { // attempt to remove message from batchMessageAckTracker - batchMessageAckTracker.remove(messageId); builder.setPartition(messageId.getPartitionIndex()); builder.setLedgerId(messageId.getLedgerId()); builder.setEntryId(messageId.getEntryId()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java index 2d8ad4ed8b2c7..4bd2fda280649 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java @@ -489,15 +489,6 @@ public CompletableFuture seekAsync(MessageId messageId) { return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics")); } - /** - * helper method that returns current state of data structure used to track acks for batch messages - * - * @return true if all batch messages have been acknowledged - */ - public boolean isBatchingAckTrackerEmpty() { - return consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty); - } - List> getConsumers() { return consumers; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java index e39de96c1d236..1089cef53697a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java @@ -536,16 +536,6 @@ public CompletableFuture seekAsync(MessageId messageId) { return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer")); } - /** - * helper method that returns current state of data structure used to track acks for batch messages - * - * @return true if all batch messages have been acknowledged - */ - public boolean isBatchingAckTrackerEmpty() { - return consumers.values().stream().allMatch(consumer -> consumer.isBatchingAckTrackerEmpty()); - } - - @Override public int getAvailablePermits() { return consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java new file mode 100644 index 0000000000000..e32a2ef45f93e --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabledTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.Test; + +public class BatchMessageAckerDisabledTest { + + @Test + public void testAckIndividual() { + for (int i = 0; i < 10; i++) { + assertTrue(BatchMessageAckerDisabled.INSTANCE.ackIndividual(i)); + } + } + + @Test + public void testAckCumulative() { + for (int i = 0; i < 10; i++) { + assertTrue(BatchMessageAckerDisabled.INSTANCE.ackCumulative(i)); + } + } + + @Test + public void testGetOutstandingAcks() { + assertEquals(0, BatchMessageAckerDisabled.INSTANCE.getOutstandingAcks()); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java new file mode 100644 index 0000000000000..2bfa620d43ad9 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageAckerTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class BatchMessageAckerTest { + + private static final int BATCH_SIZE = 10; + + private BatchMessageAcker acker; + + @BeforeMethod + public void setup() { + acker = BatchMessageAcker.newAcker(10); + } + + @Test + public void testAckers() { + assertEquals(BATCH_SIZE, acker.getOutstandingAcks()); + assertEquals(BATCH_SIZE, acker.getBatchSize()); + + assertFalse(acker.ackIndividual(4)); + for (int i = 0; i < BATCH_SIZE; i++) { + if (4 == i) { + assertFalse(acker.getBitSet().get(i)); + } else { + assertTrue(acker.getBitSet().get(i)); + } + } + + assertFalse(acker.ackCumulative(6)); + for (int i = 0; i < BATCH_SIZE; i++) { + if (i <= 6) { + assertFalse(acker.getBitSet().get(i)); + } else { + assertTrue(acker.getBitSet().get(i)); + } + } + + for (int i = BATCH_SIZE - 1; i >= 8; i--) { + assertFalse(acker.ackIndividual(i)); + assertFalse(acker.getBitSet().get(i)); + } + + assertTrue(acker.ackIndividual(7)); + assertEquals(0, acker.getOutstandingAcks()); + } + +} From 46266ecfa5b634652c9732a84ac8f248435acb2e Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Fri, 23 Mar 2018 16:30:25 -0700 Subject: [PATCH 2/4] fix --- .../apache/pulsar/client/impl/BatchMessageAckerDisabled.java | 5 +++++ .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java index 245e83057ef26..dd842c902dc9f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java @@ -26,6 +26,11 @@ private BatchMessageAckerDisabled() { super(null); } + @Override + public synchronized int getBatchSize() { + return 0; + } + @Override public boolean ackIndividual(int batchIndex) { return true; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index cea7181c5f0f2..007fd260ce175 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -371,7 +371,7 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp } return true; } else { - if (outstandingAcks == batchSize - 1) { + if (AckType.Cumulative == ackType && outstandingAcks == batchSize - 1) { sendAcknowledge(batchMessageId.prevBatchMessageId(), AckType.Cumulative, properties); } if (log.isDebugEnabled()) { From ceb7b45df9a4eaaa772de3a001bc13c151602c5a Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Fri, 23 Mar 2018 19:56:36 -0700 Subject: [PATCH 3/4] Fix the cumulative ack --- .../apache/pulsar/broker/service/BatchMessageTest.java | 3 ++- .../org/apache/pulsar/client/impl/BatchMessageAcker.java | 9 +++++++++ .../apache/pulsar/client/impl/BatchMessageIdImpl.java | 4 ++++ .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++++-- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index baa814f68de33..533ce85077c7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -433,7 +434,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception { Message lastunackedMsg = null; for (int i = 0; i < numMsgs; i++) { Message msg = consumer.receive(5, TimeUnit.SECONDS); - LOG.info("received message {}", String.valueOf(msg.getData())); + LOG.info("received message {}", new String(msg.getData(), UTF_8)); assertNotNull(msg); if (i == 8) { consumer.acknowledgeCumulative(msg); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index fb8e6d98d982a..b59d1ca1a09b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -31,6 +31,7 @@ static BatchMessageAcker newAcker(int batchSize) { // bitset shared across messages in the same batch. private final BitSet bitSet; + private boolean prevBatchCumulativelyAcked = false; BatchMessageAcker(BitSet bitSet) { this.bitSet = bitSet; @@ -61,4 +62,12 @@ public synchronized int getOutstandingAcks() { return bitSet.cardinality(); } + public void setPrevBatchCumulativelyAcked(boolean acked) { + this.prevBatchCumulativelyAcked = acked; + } + + public boolean isPrevBatchCumulativelyAcked() { + return prevBatchCumulativelyAcked; + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 9381d3571fe6e..d87a6ab91be14 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -127,4 +127,8 @@ public MessageIdImpl prevBatchMessageId() { ledgerId, entryId - 1, partitionIndex); } + public BatchMessageAcker getAcker() { + return acker; + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 007fd260ce175..acd8c5564c19f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -355,7 +355,10 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp } else { isAllMsgsAcked = batchMessageId.ackCumulative(); } - int outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch(); + int outstandingAcks = 0; + if (log.isDebugEnabled()) { + outstandingAcks = batchMessageId.getOutstandingAcksInSameBatch(); + } int batchSize = batchMessageId.getBatchSize(); // all messages in this batch have been acked @@ -371,8 +374,10 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp } return true; } else { - if (AckType.Cumulative == ackType && outstandingAcks == batchSize - 1) { + if (AckType.Cumulative == ackType + && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { sendAcknowledge(batchMessageId.prevBatchMessageId(), AckType.Cumulative, properties); + batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); } if (log.isDebugEnabled()) { log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", subscription, From 42fa9e0105dfcafc4c9d3316aa910fb76f1851f3 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 27 Mar 2018 22:38:57 -0700 Subject: [PATCH 4/4] Fix batchsize --- .../org/apache/pulsar/client/impl/BatchMessageAcker.java | 8 +++++--- .../pulsar/client/impl/BatchMessageAckerDisabled.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index b59d1ca1a09b9..75e50aa2fdf50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -26,15 +26,17 @@ class BatchMessageAcker { static BatchMessageAcker newAcker(int batchSize) { BitSet bitSet = new BitSet(batchSize); bitSet.set(0, batchSize); - return new BatchMessageAcker(bitSet); + return new BatchMessageAcker(bitSet, batchSize); } // bitset shared across messages in the same batch. + private final int batchSize; private final BitSet bitSet; private boolean prevBatchCumulativelyAcked = false; - BatchMessageAcker(BitSet bitSet) { + BatchMessageAcker(BitSet bitSet, int batchSize) { this.bitSet = bitSet; + this.batchSize = batchSize; } @VisibleForTesting @@ -43,7 +45,7 @@ BitSet getBitSet() { } public synchronized int getBatchSize() { - return bitSet.length(); + return batchSize; } public synchronized boolean ackIndividual(int batchIndex) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java index dd842c902dc9f..a521d63446e82 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAckerDisabled.java @@ -23,7 +23,7 @@ class BatchMessageAckerDisabled extends BatchMessageAcker { static final BatchMessageAckerDisabled INSTANCE = new BatchMessageAckerDisabled(); private BatchMessageAckerDisabled() { - super(null); + super(null, 0); } @Override