diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 7be0590fe53af..a57f2c8ccb4f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -44,8 +44,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -337,8 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer1.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); - receivedPtns.add(msgId.getPartitionIndex()); + receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition()); } assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty()); @@ -354,8 +352,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer2.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); - receivedPtns.add(msgId.getPartitionIndex()); + receivedPtns.add(((PulsarApiMessageId) msg.getMessageId()).getPartition()); } assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index b6f1771c08882..808de13339440 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -50,7 +50,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.RelativeTimeUtil; import org.awaitility.Awaitility; @@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception { if (message == null) { break; } - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId(); - received.add(topicMessageId.getInnerMessageId()); + received.add(message.getMessageId()); } int msgNumFromPartition1 = list.size() / 2; int msgNumFromPartition2 = 1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java new file mode 100644 index 0000000000000..a05b67336b0ad --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Cleanup; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class CustomMessageIdTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testSeek() throws Exception { + final String topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis(); + final var msgIdList = produceMessages(topic); + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscribe(); + final int ackIndex = msgIdList.size() / 2 + 1; + consumer.seek(msgIdList.get(ackIndex)); + final var msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals(msg.getValue(), "msg-" + (ackIndex + 1)); + } + + @Test(timeOut = 30000) + public void testAck() throws Exception { + final String topic = "persistent://my-property/my-ns/test-ack-" + System.currentTimeMillis(); + @Cleanup final var consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + produceMessages(topic); + final int ackIndex = 3; + NonBatchedMessageId messageIdToAck = null; + for (int i = 0; i < 10; i++) { + var msg = consumer.receive(); + var msgId = (PulsarApiMessageId) msg.getMessageId(); + if (i == ackIndex) { + messageIdToAck = new NonBatchedMessageId(msgId.getLedgerId(), msgId.getEntryId()); + } + } + assertNotNull(messageIdToAck); + consumer.acknowledgeCumulative(messageIdToAck); + consumer.redeliverUnacknowledgedMessages(); + var msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals(msg.getValue(), "msg-" + (ackIndex + 1)); + } + + private List produceMessages(String topic) throws PulsarClientException { + @Cleanup final var producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create(); + final var msgIdList = new ArrayList(); + for (int i = 0; i < 10; i++) { + final var msgId = (PulsarApiMessageId) producer.send("msg-" + i); + msgIdList.add(new NonBatchedMessageId(msgId.getLedgerId(), msgId.getEntryId())); + } + return msgIdList; + } + + @AllArgsConstructor + private static class NonBatchedMessageId implements PulsarApiMessageId { + // For non-batched message id in a single topic, only ledger id and entry id are required + + private final long ledgerId; + private final long entryId; + + @Override + public byte[] toByteArray() { + return new byte[0]; // dummy implementation + } + + @Override + public long getLedgerId() { + return ledgerId; + } + + @Override + public long getEntryId() { + return entryId; + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index e0e1bf20e7cbc..93acee14a9dd9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -39,7 +39,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; @@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception { for (int i = 0; i < totalMessages; i ++) { msg = consumer1.receive(5, TimeUnit.SECONDS); - Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2); + Assert.assertEquals(((PulsarApiMessageId) msg.getMessageId()).getPartition(), 2); consumer1.acknowledge(msg); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java index 42da60906483c..eb7e7cf683ef6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java @@ -39,6 +39,7 @@ import lombok.AllArgsConstructor; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerInterceptor; import org.apache.pulsar.client.api.Message; @@ -176,10 +177,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException messageIds.add(message.getMessageId()); } MessageId firstEntryMessageId = messageIds.get(0); - MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl(); + MessageId secondEntryMessageId = MessageIdImpl.from((PulsarApiMessageId) messageIds.get(1)); // Verify messages 2 to N must be in the same entry for (int i = 2; i < messageIds.size(); i++) { - assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId); + assertEquals(MessageIdImpl.from((PulsarApiMessageId) messageIds.get(i)), secondEntryMessageId); } assertTrue(interceptor.individualAckedMessageIdList.isEmpty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index c4cdcbd19d575..375bbff8a4df4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException, Message message = consumer.receive(); assertEquals(new String(message.getData()), messagePrefix + i); MessageId messageId = message.getMessageId(); - if (topicType == TopicType.PARTITIONED) { - messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId(); - } assertTrue(messageIds.remove(messageId), "Failed to receive message"); } log.info("Remaining message IDs = {}", messageIds); @@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls for (int i = 0; i < numberOfMessages; i++) { MessageId messageId = consumer.receive().getMessageId(); - if (topicType == TopicType.PARTITIONED) { - messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId(); - } assertTrue(messageIds.remove(messageId), "Failed to receive Message"); } log.info("Remaining message IDs = {}", messageIds); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 679d6c1a19e51..b4855769bdf4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -30,9 +30,9 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; @@ -291,7 +291,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { .negativeAckRedeliveryDelay(100, TimeUnit.SECONDS) .subscribe(); - MessageId messageId = new MessageIdImpl(3, 1, 0); + PulsarApiMessageId messageId = new MessageIdImpl(3, 1, 0); TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl("topic-1", "topic-1", messageId); BatchMessageIdImpl batchMessageId = new BatchMessageIdImpl(3, 1, 0, 0); BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(3, 1, 0, 1); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 5dbb3935ce98a..e0e4a5cdd0305 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -38,9 +38,8 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.cli.NoSplitter; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.util.RelativeTimeUtil; @Parameters(commandDescription = "Operations on persistent topics. The persistent-topics " @@ -611,12 +610,11 @@ void run() throws PulsarAdminException { if (++position != 1) { System.out.println("-------------------------------------------------------------------------\n"); } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); + PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId(); + if (msgId.isBatch()) { System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); } if (msg.getProperties().size() > 0) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index eb8bee3bb8452..781cbf8ab6c95 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -55,6 +55,7 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.cli.NoSplitter; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -1192,12 +1193,11 @@ void run() throws PulsarAdminException { if (++position != 1) { System.out.println("-------------------------------------------------------------------------\n"); } + PulsarApiMessageId msgId = (PulsarApiMessageId) msg.getMessageId(); if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); } @@ -1251,12 +1251,11 @@ void run() throws PulsarAdminException { MessageImpl message = (MessageImpl) getTopics().examineMessage(persistentTopic, initialPosition, messagePosition); - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId(); + if (msgId.isBatch()) { System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); } else { - MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); } @@ -1310,12 +1309,11 @@ void run() throws PulsarAdminException { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); } else { - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + PulsarApiMessageId msgId = (PulsarApiMessageId) message.getMessageId(); + if (msgId.isBatch()) { System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); } else { - MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java index d46af1a99e7f0..2a4ddb7e35d3f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.common.api.proto.CommandAck.AckType; /** @@ -31,7 +32,8 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable { boolean isDuplicate(MessageId messageId); - CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties); + CompletableFuture addAcknowledgment(PulsarApiMessageId msgId, AckType ackType, + Map properties); CompletableFuture addListAcknowledgment(List messageIds, AckType ackType, Map properties); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index 1c9b66fd2bad5..dfcba24b2abc4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -20,6 +20,7 @@ import java.util.BitSet; +@Deprecated public class BatchMessageAcker { private BatchMessageAcker() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 7e3a143dff8e0..ffdd6a629d6b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -18,19 +18,30 @@ */ package org.apache.pulsar.client.impl; +import java.util.BitSet; import javax.annotation.Nonnull; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; +import org.apache.pulsar.client.util.MessageIdUtils; + +// When the batch index ACK is disabled and a batched message is acknowledged cumulatively, the previous message might +// be acknowledged instead. Since a message should not be acknowledged repeatedly, this interface defines a method to +// determine whether the previous message can be acknowledged. +interface PreviousMessageAcknowledger { + + boolean canAckPreviousMessage(); +} /** */ -public class BatchMessageIdImpl extends MessageIdImpl { +public class BatchMessageIdImpl extends MessageIdImpl implements PreviousMessageAcknowledger { private static final long serialVersionUID = 1L; - static final int NO_BATCH = -1; private final int batchIndex; private final int batchSize; - private final transient BatchMessageAcker acker; + private final BitSet ackSet; + private volatile boolean prevBatchCumulativelyAcked = false; // Private constructor used only for json deserialization @SuppressWarnings("unused") @@ -42,51 +53,41 @@ public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int b this(ledgerId, entryId, partitionIndex, batchIndex, 0, BatchMessageAckerDisabled.INSTANCE); } + public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, int batchSize, + BitSet ackSet) { + super(ledgerId, entryId, partitionIndex); + this.batchIndex = batchIndex; + this.batchSize = batchSize; + this.ackSet = ackSet; + } + + @Deprecated public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, int batchSize, BatchMessageAcker acker) { super(ledgerId, entryId, partitionIndex); this.batchIndex = batchIndex; this.batchSize = batchSize; - this.acker = acker; + this.ackSet = acker.getBitSet(); } - public BatchMessageIdImpl(MessageIdImpl other) { - super(other.ledgerId, other.entryId, other.partitionIndex); - if (other instanceof BatchMessageIdImpl) { - BatchMessageIdImpl otherId = (BatchMessageIdImpl) other; - this.batchIndex = otherId.batchIndex; - this.batchSize = otherId.batchSize; - this.acker = otherId.acker; - } else { - this.batchIndex = NO_BATCH; - this.batchSize = 0; - this.acker = BatchMessageAckerDisabled.INSTANCE; - } + public BatchMessageIdImpl(PulsarApiMessageId other) { + this(other.getLedgerId(), other.getEntryId(), other.getPartition(), + other.getBatchIndex(), other.getBatchSize(), other.getAckSet()); } + @Override public int getBatchIndex() { return batchIndex; } @Override public int compareTo(@Nonnull MessageId o) { - if (o instanceof MessageIdImpl) { - MessageIdImpl other = (MessageIdImpl) o; - int batchIndex = (o instanceof BatchMessageIdImpl) ? ((BatchMessageIdImpl) o).batchIndex : NO_BATCH; - return messageIdCompare( - this.ledgerId, this.entryId, this.partitionIndex, this.batchIndex, - other.ledgerId, other.entryId, other.partitionIndex, batchIndex - ); - } else if (o instanceof TopicMessageIdImpl) { - return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); - } else { - throw new UnsupportedOperationException("Unknown MessageId type: " + o.getClass().getName()); - } + return super.compareTo(o); } @Override public int hashCode() { - return messageIdHashCode(ledgerId, entryId, partitionIndex, batchIndex); + return super.hashCode(); } @Override @@ -105,39 +106,65 @@ public byte[] toByteArray() { return toByteArray(batchIndex, batchSize); } + @Deprecated public boolean ackIndividual() { - return acker.ackIndividual(batchIndex); + return MessageIdUtils.acknowledge(this, true); } + @Deprecated public boolean ackCumulative() { - return acker.ackCumulative(batchIndex); + return MessageIdUtils.acknowledge(this, false); } + @Deprecated public int getOutstandingAcksInSameBatch() { - return acker.getOutstandingAcks(); + return 0; + } + + @Override + public BitSet getAckSet() { + return ackSet; } + @Override public int getBatchSize() { - return acker.getBatchSize(); + return batchSize; } + @Deprecated public int getOriginalBatchSize() { return this.batchSize; } + @Deprecated public MessageIdImpl prevBatchMessageId() { - return new MessageIdImpl( - ledgerId, entryId - 1, partitionIndex); + return MessageIdImpl.prevMessageId(this); + } + + public static BatchMessageIdImpl prevMessageId(PulsarApiMessageId msgId) { + if (msgId.isBatch()) { + return new BatchMessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartition(), + msgId.getBatchIndex() - 1); + } else { + return new BatchMessageIdImpl(msgId.getLedgerId(), msgId.getEntryId() - 1, + msgId.getPartition(), -1); + } } // MessageIdImpl is widely used as the key of a hash map, in this case, we should convert the batch message id to // have the correct hash code. + @Deprecated public MessageIdImpl toMessageIdImpl() { - return new MessageIdImpl(ledgerId, entryId, partitionIndex); + return MessageIdImpl.from(this); } - public BatchMessageAcker getAcker() { - return acker; + @Override + public boolean canAckPreviousMessage() { + if (prevBatchCumulativelyAcked) { + return false; + } else { + prevBatchCumulativelyAcked = true; + return true; + } } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java index 28d5047c8ef25..0bf0317d3604c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java @@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled; import java.util.Objects; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.common.api.proto.MessageIdData; public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId { @@ -32,10 +33,12 @@ public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunk this.firstChunkMsgId = firstChunkMsgId; } - public MessageIdImpl getFirstChunkMessageId() { + @Override + public PulsarApiMessageId getFirstChunkMessageId() { return firstChunkMsgId; } + @Deprecated public MessageIdImpl getLastChunkMessageId() { return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 3a8bb1867c91b..413c82a0bcc83 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; @@ -82,7 +83,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer> incomingMessages; - protected ConcurrentOpenHashMap unAckedChunkedMessageIdSequenceMap; + protected ConcurrentOpenHashMap unAckedChunkedMessageIdSequenceMap; protected final ConcurrentLinkedQueue>> pendingReceives; protected final int maxReceiverQueueSize; private volatile int currentReceiverQueueSize; @@ -128,7 +129,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat // Always use growable queue since items can exceed the advertised size this.incomingMessages = new GrowableArrayBlockingQueue<>(); this.unAckedChunkedMessageIdSequenceMap = - ConcurrentOpenHashMap.newBuilder().build(); + ConcurrentOpenHashMap.newBuilder().build(); this.executorProvider = executorProvider; this.externalPinnedExecutor = executorProvider.getExecutor(); this.internalPinnedExecutor = client.getInternalExecutorService(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2909a62933e16..b7a981c2c0db5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -73,6 +74,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException; import org.apache.pulsar.client.api.Schema; @@ -85,6 +87,7 @@ import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.client.util.MessageIdUtils; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; @@ -257,11 +260,11 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); if (startMessageId != null) { - if (startMessageId instanceof ChunkMessageIdImpl) { - this.startMessageId = new BatchMessageIdImpl( - ((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId()); + PulsarApiMessageId msgIdData = (PulsarApiMessageId) startMessageId; + if (msgIdData.getFirstChunkMessageId() != null) { + this.startMessageId = new BatchMessageIdImpl(msgIdData.getFirstChunkMessageId()); } else { - this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId); + this.startMessageId = new BatchMessageIdImpl(msgIdData); } } this.initialStartMessageId = this.startMessageId; @@ -544,7 +547,6 @@ protected CompletableFuture> internalBatchReceiveAsync() { protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn) { - checkArgument(messageId instanceof MessageIdImpl); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -560,16 +562,14 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); } - return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties); + return acknowledgmentsGroupingTracker.addAcknowledgment((PulsarApiMessageId) messageId, + ackType, properties); } @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { - for (MessageId messageId : messageIdList) { - checkArgument(messageId instanceof MessageIdImpl); - } if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -601,10 +601,6 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .InvalidMessageException("Cannot handle message with null messageId")); } - if (messageId instanceof TopicMessageIdImpl) { - messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId(); - } - checkArgument(messageId instanceof MessageIdImpl); if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -657,7 +653,6 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime))); - MessageId finalMessageId = messageId; if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) { initDeadLetterProducerIfNeeded(); @@ -667,7 +662,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .value(retryMessage.getData()) .properties(propertiesMap); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { - doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { + doAcknowledge(messageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { result.complete(null); }).exceptionally(ex -> { result.completeExceptionally(ex); @@ -693,7 +688,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a typedMessageBuilderNew.key(message.getKey()); } typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenCompose(__ -> doAcknowledge(messageId, ackType, Collections.emptyMap(), null)) .thenAccept(v -> result.complete(null)) .exceptionally(ex -> { result.completeExceptionally(ex); @@ -704,12 +699,11 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a result.completeExceptionally(e); } } - MessageId finalMessageId = messageId; result.exceptionally(ex -> { log.error("Send to retry letter topic exception with topic: {}, messageId: {}", - retryLetterProducer.getTopic(), finalMessageId, ex); - Set messageIds = Collections.singleton(finalMessageId); - unAckedMessageTracker.remove(finalMessageId); + retryLetterProducer.getTopic(), messageId, ex); + Set messageIds = Collections.singleton(messageId); + unAckedMessageTracker.remove(messageId); redeliverUnacknowledgedMessages(messageIds); return null; }); @@ -731,12 +725,7 @@ private SortedMap getPropertiesMap(Message message, } private String getOriginMessageIdStr(Message message) { - if (message instanceof TopicMessageImpl) { - return ((TopicMessageIdImpl) message.getMessageId()).getInnerMessageId().toString(); - } else if (message instanceof MessageImpl) { - return message.getMessageId().toString(); - } - return null; + return message.getMessageId().toString(); } private String getOriginTopicNameStr(Message message) { @@ -943,17 +932,7 @@ private BatchMessageIdImpl clearReceiverQueue() { if (!currentMessageQueue.isEmpty()) { MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); - BatchMessageIdImpl previousMessage; - if (nextMessageInQueue instanceof BatchMessageIdImpl) { - // Get on the previous message within the current batch - previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(), - nextMessageInQueue.getEntryId(), nextMessageInQueue.getPartitionIndex(), - ((BatchMessageIdImpl) nextMessageInQueue).getBatchIndex() - 1); - } else { - // Get on previous message in previous entry - previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(), - nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex(), -1); - } + BatchMessageIdImpl previousMessage = BatchMessageIdImpl.prevMessageId(nextMessageInQueue); // release messages if they are pooled messages currentMessageQueue.forEach(Message::release); return previousMessage; @@ -1122,7 +1101,7 @@ protected MessageImpl newSingleMessage(final int index, final Schema schema, final boolean containMetadata, final BitSetRecyclable ackBitSet, - final BatchMessageAcker acker, + final BitSet ackSetForBatchIndexAckDisabled, final int redeliveryCount, final long consumerEpoch) { if (log.isDebugEnabled()) { @@ -1157,7 +1136,7 @@ protected MessageImpl newSingleMessage(final int index, } BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), - messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker); + messageId.getEntryId(), getPartitionIndex(), index, numMessages, ackSetForBatchIndexAckDisabled); final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload; final MessageImpl message = MessageImpl.create(topicName.toString(), batchMessageIdImpl, @@ -1389,7 +1368,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien } private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, - MessageIdData messageId, ClientCnx cnx) { + MessageIdData messageId, ClientCnx cnx) { // Lazy task scheduling to expire incomplete chunk message if (expireTimeOfIncompleteChunkedMessageMillis > 0 && expireChunkMessageTaskScheduled.compareAndSet(false, @@ -1521,19 +1500,23 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, possibleToDeadLetter = new ArrayList<>(); } - BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); BitSetRecyclable ackBitSet = null; if (ackSet != null && ackSet.size() > 0) { ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); } + // When batch index ack is disabled, the message id needs to store a BitSet to record which messages in the + // batch are acknowledged. + BitSet ackSetForBatchIndexAckDisabled = new BitSet(batchSize); + ackSetForBatchIndexAckDisabled.set(0, batchSize); + SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); int skippedMessages = 0; try { for (int i = 0; i < batchSize; ++i) { final MessageImpl message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata, singleMessageMetadata, uncompressedPayload, batchMessage, schema, true, - ackBitSet, acker, redeliveryCount, consumerEpoch); + ackBitSet, ackSetForBatchIndexAckDisabled, redeliveryCount, consumerEpoch); if (message == null) { skippedMessages++; continue; @@ -1629,12 +1612,8 @@ protected void trackMessage(MessageId messageId) { } protected void trackMessage(MessageId messageId, int redeliveryCount) { - if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) { - MessageIdImpl id = (MessageIdImpl) messageId; - if (id instanceof BatchMessageIdImpl) { - // do not add each item in batch message into tracker - id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex()); - } + if (conf.getAckTimeoutMillis() > 0) { + MessageIdImpl id = MessageIdImpl.from((PulsarApiMessageId) messageId); if (hasParentConsumer) { //TODO: check parent consumer here // we should no longer track this message, TopicsConsumer will take care from now onwards @@ -1770,7 +1749,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo } private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, - ClientCnx currentCnx, boolean checkMaxMessageSize) { + ClientCnx currentCnx, boolean checkMaxMessageSize) { CompressionType compressionType = msgMetadata.getCompression(); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); int uncompressedSize = msgMetadata.getUncompressedSize(); @@ -1823,7 +1802,7 @@ private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentC } private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, - ValidationError validationError) { + ValidationError validationError) { log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); discardMessage(messageId, currentCnx, validationError); @@ -1931,8 +1910,6 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { return; } - checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl); - if (conf.getSubscriptionType() != SubscriptionType.Shared && conf.getSubscriptionType() != SubscriptionType.Key_Shared) { // We cannot redeliver single messages if subscription type is not Shared @@ -1942,9 +1919,9 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { ClientCnx cnx = cnx(); if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) { int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds); - Iterable> batches = Iterables.partition( + Iterable> batches = Iterables.partition( messageIds.stream() - .map(messageId -> (MessageIdImpl) messageId) + .map(messageId -> (PulsarApiMessageId) messageId) .collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED); batches.forEach(ids -> { getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> { @@ -1985,7 +1962,7 @@ protected void completeOpBatchReceive(OpBatchReceive op) { notifyPendingBatchReceivedCallBack(op.future); } - private CompletableFuture> getRedeliveryMessageIdData(List messageIds) { + private CompletableFuture> getRedeliveryMessageIdData(List messageIds) { if (messageIds == null || messageIds.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyList()); } @@ -1994,7 +1971,7 @@ private CompletableFuture> getRedeliveryMessageIdData(List { if (!sendToDLQ) { return new MessageIdData() - .setPartition(messageId.getPartitionIndex()) + .setPartition(messageId.getPartition()) .setLedgerId(messageId.getLedgerId()) .setEntryId(messageId.getEntryId()); } @@ -2005,20 +1982,17 @@ private CompletableFuture> getRedeliveryMessageIdData(List processPossibleToDLQ(MessageIdImpl messageId) { + private CompletableFuture processPossibleToDLQ(PulsarApiMessageId messageId) { List> deadLetterMessages = null; if (possibleSendToDeadLetterTopicMessages != null) { - if (messageId instanceof BatchMessageIdImpl) { - messageId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), - getPartitionIndex()); - } + messageId = MessageIdImpl.from(messageId); deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId); } CompletableFuture result = new CompletableFuture<>(); if (deadLetterMessages != null) { initDeadLetterProducerIfNeeded(); List> finalDeadLetterMessages = deadLetterMessages; - MessageIdImpl finalMessageId = messageId; + PulsarApiMessageId finalMessageId = messageId; deadLetterProducer.thenAcceptAsync(producerDLQ -> { for (MessageImpl message : finalDeadLetterMessages) { String originMessageIdStr = getOriginMessageIdStr(message); @@ -2152,7 +2126,7 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, ClientCnx cnx = cnx(); BatchMessageIdImpl originSeekMessageId = seekMessageId; - seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId); + seekMessageId = new BatchMessageIdImpl((PulsarApiMessageId) seekId); duringSeek.set(true); log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy); @@ -2195,23 +2169,24 @@ public CompletableFuture seekAsync(MessageId messageId) { return seekAsyncCheckState(seekBy).orElseGet(() -> { long requestId = client.newRequestId(); ByteBuf seek = null; - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId; - // Initialize ack set - BitSetRecyclable ackSet = BitSetRecyclable.create(); - ackSet.set(0, msgId.getBatchSize()); - ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0)); - long[] ackSetArr = ackSet.toLongArray(); - ackSet.recycle(); - - seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr); - } else if (messageId instanceof ChunkMessageIdImpl) { - ChunkMessageIdImpl msgId = (ChunkMessageIdImpl) messageId; - seek = Commands.newSeek(consumerId, requestId, msgId.getFirstChunkMessageId().getLedgerId(), - msgId.getFirstChunkMessageId().getEntryId(), new long[0]); + PulsarApiMessageId msgIdData = (PulsarApiMessageId) messageId; + if (msgIdData.getFirstChunkMessageId() != null) { + PulsarApiMessageId firstMsgId = msgIdData.getFirstChunkMessageId(); + seek = Commands.newSeek(consumerId, requestId, firstMsgId.getLedgerId(), firstMsgId.getEntryId(), + new long[0]); } else { - MessageIdImpl msgId = (MessageIdImpl) messageId; - seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), new long[0]); + final long[] ackSetArr; + if (msgIdData.isBatch()) { + BitSetRecyclable ackSet = BitSetRecyclable.create(); + ackSet.set(0, msgIdData.getBatchSize()); + ackSet.clear(0, Math.max(msgIdData.getBatchIndex(), 0)); + ackSetArr = ackSet.toLongArray(); + ackSet.recycle(); + } else { + ackSetArr = new long[0]; + } + seek = Commands.newSeek(consumerId, requestId, msgIdData.getLedgerId(), msgIdData.getEntryId(), + ackSetArr); } return seekAsyncInternal(requestId, seek, messageId, seekBy); }); @@ -2243,9 +2218,8 @@ public CompletableFuture hasMessageAvailableAsync() { } future.thenAccept(response -> { - MessageIdImpl lastMessageId = MessageIdImpl.convertToMessageIdImpl(response.lastMessageId); - MessageIdImpl markDeletePosition = MessageIdImpl - .convertToMessageIdImpl(response.markDeletePosition); + PulsarApiMessageId lastMessageId = (PulsarApiMessageId) response.lastMessageId; + PulsarApiMessageId markDeletePosition = (PulsarApiMessageId) response.markDeletePosition; if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0 && markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) { @@ -2431,12 +2405,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } private MessageIdImpl getMessageIdImpl(Message msg) { - MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId(); - if (messageId instanceof BatchMessageIdImpl) { - // messageIds contain MessageIdImpl, not BatchMessageIdImpl - messageId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - } - return messageId; + return MessageIdImpl.from((PulsarApiMessageId) msg.getMessageId()); } @@ -2686,32 +2655,26 @@ private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx, private CompletableFuture doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType, ValidationError validationError, Map properties, TxnID txnID) { - BitSetRecyclable bitSetRecyclable = null; - long ledgerId; - long entryId; ByteBuf cmd; long requestId = client.newRequestId(); - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - bitSetRecyclable = BitSetRecyclable.create(); - ledgerId = batchMessageId.getLedgerId(); - entryId = batchMessageId.getEntryId(); + PulsarApiMessageId msgIdData = (PulsarApiMessageId) messageId; + final long ledgerId = msgIdData.getLedgerId(); + final long entryId = msgIdData.getEntryId(); + if (msgIdData.isBatch()) { + BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create(); if (ackType == AckType.Cumulative) { - batchMessageId.ackCumulative(); - bitSetRecyclable.set(0, batchMessageId.getBatchSize()); - bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1); + MessageIdUtils.acknowledge(msgIdData, false); + bitSetRecyclable.set(0, msgIdData.getBatchSize()); + bitSetRecyclable.clear(0, msgIdData.getBatchIndex() + 1); } else { - bitSetRecyclable.set(0, batchMessageId.getBatchSize()); - bitSetRecyclable.clear(batchMessageId.getBatchIndex()); + bitSetRecyclable.set(0, msgIdData.getBatchSize()); + bitSetRecyclable.clear(msgIdData.getBatchIndex()); } cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties, - txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, batchMessageId.getBatchSize()); + txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, msgIdData.getBatchSize()); bitSetRecyclable.recycle(); } else { - MessageIdImpl singleMessage = (MessageIdImpl) messageId; - ledgerId = singleMessage.getLedgerId(); - entryId = singleMessage.getEntryId(); - cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, + cmd = Commands.newAck(consumerId, ledgerId, entryId, null, ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId); } @@ -2733,28 +2696,27 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me private CompletableFuture doTransactionAcknowledgeForResponse(List messageIds, AckType ackType, ValidationError validationError, Map properties, TxnID txnID) { - BitSetRecyclable bitSetRecyclable = null; - long ledgerId; - long entryId; ByteBuf cmd; long requestId = client.newRequestId(); List messageIdDataList = new LinkedList<>(); for (MessageId messageId : messageIds) { - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - bitSetRecyclable = BitSetRecyclable.create(); + PulsarApiMessageId msgId = (PulsarApiMessageId) messageId; + final long ledgerId = msgId.getLedgerId(); + final long entryId = msgId.getEntryId(); + if (msgId.isBatch()) { + BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create(); if (ackType == AckType.Cumulative) { - batchMessageId.ackCumulative(); - bitSetRecyclable.set(0, batchMessageId.getBatchSize()); - bitSetRecyclable.clear(0, batchMessageId.getBatchIndex() + 1); + MessageIdUtils.acknowledge(msgId, false); + bitSetRecyclable.set(0, msgId.getBatchSize()); + bitSetRecyclable.clear(0, msgId.getBatchIndex() + 1); } else { - bitSetRecyclable.set(0, batchMessageId.getBatchSize()); - bitSetRecyclable.clear(batchMessageId.getBatchIndex()); + bitSetRecyclable.set(0, msgId.getBatchSize()); + bitSetRecyclable.clear(msgId.getBatchIndex()); } MessageIdData messageIdData = new MessageIdData(); - messageIdData.setLedgerId(batchMessageId.getLedgerId()); - messageIdData.setEntryId(batchMessageId.getEntryId()); - messageIdData.setBatchSize(batchMessageId.getBatchSize()); + messageIdData.setLedgerId(ledgerId); + messageIdData.setEntryId(entryId); + messageIdData.setBatchSize(msgId.getBatchSize()); long[] as = bitSetRecyclable.toLongArray(); for (int i = 0; i < as.length; i++) { messageIdData.addAckSet(as[i]); @@ -2762,9 +2724,6 @@ private CompletableFuture doTransactionAcknowledgeForResponse(List getIndex() { if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { - if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof BatchMessageIdImpl) { - int batchSize = ((BatchMessageIdImpl) messageId).getBatchSize(); - int batchIndex = ((BatchMessageIdImpl) messageId).getBatchIndex(); - return Optional.of(brokerEntryMetadata.getIndex() - batchSize + batchIndex + 1); + final PulsarApiMessageId msgIdData = (PulsarApiMessageId) messageId; + if (msgMetadata.hasNumMessagesInBatch() && msgIdData.isBatch()) { + return Optional.of(brokerEntryMetadata.getIndex() + - msgIdData.getBatchSize() + msgIdData.getBatchIndex() + 1); } return Optional.of(brokerEntryMetadata.getIndex()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java index dcae86bd01a3b..1854d24d0cbe9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; +import java.util.BitSet; import java.util.List; import lombok.NonNull; import org.apache.pulsar.client.api.Message; @@ -50,9 +51,9 @@ protected MessagePayloadContextImpl newObject(Handle private MessageIdImpl messageId; private ConsumerImpl consumer; private int redeliveryCount; - private BatchMessageAcker acker; private BitSetRecyclable ackBitSet; private long consumerEpoch; + private BitSet ackSetForBatchIndexAckDisabled; private MessagePayloadContextImpl(final Recycler.Handle handle) { this.recyclerHandle = handle; @@ -73,10 +74,11 @@ public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntr context.messageId = messageId; context.consumer = consumer; context.redeliveryCount = redeliveryCount; - context.acker = BatchMessageAcker.newAcker(context.getNumMessages()); context.ackBitSet = (ackSet != null && ackSet.size() > 0) ? BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)) : null; + context.ackSetForBatchIndexAckDisabled = new BitSet(context.messageMetadata.getNumMessagesInBatch()); + context.ackSetForBatchIndexAckDisabled.set(0, context.messageMetadata.getNumMessagesInBatch()); return context; } @@ -88,7 +90,6 @@ public void recycle() { consumer = null; redeliveryCount = 0; consumerEpoch = DEFAULT_CONSUMER_EPOCH; - acker = null; if (ackBitSet != null) { ackBitSet.recycle(); ackBitSet = null; @@ -134,7 +135,7 @@ public Message getMessageAt(int index, schema, containMetadata, ackBitSet, - acker, + ackSetForBatchIndexAckDisabled, redeliveryCount, consumerEpoch); } finally { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java index 6e60239ffe537..f40e3476dd0e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiMessageIdImpl.java @@ -29,6 +29,7 @@ * This is useful when MessageId is need for partition/multi-topics/pattern consumer. * e.g. seek(), ackCumulative(), getLastMessageId(). */ +@Deprecated public class MultiMessageIdImpl implements MessageId { @Getter private Map map; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 32431415a6c0f..6dba8e1dd6242 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException; import org.apache.pulsar.client.api.Schema; @@ -139,7 +140,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { this.pausedConsumers = new ConcurrentLinkedQueue<>(); this.allTopicPartitionsNumber = new AtomicInteger(0); this.startMessageId = startMessageId != null - ? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId)) + ? new BatchMessageIdImpl((PulsarApiMessageId) startMessageId) : null; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; this.paused = conf.isStartPaused(); @@ -462,18 +463,16 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack } if (ackType == AckType.Cumulative) { - Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); + Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { - MessageId innerId = topicMessageId.getInnerMessageId(); - return individualConsumer.acknowledgeCumulativeAsync(innerId); + return individualConsumer.acknowledgeCumulativeAsync(topicMessageId); } else { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); - MessageId innerId = topicMessageId.getInnerMessageId(); - return consumer.doAcknowledgeWithTxn(innerId, ackType, properties, txnImpl) + return consumer.doAcknowledgeWithTxn(messageId, ackType, properties, txnImpl) .thenRun(() -> unAckedMessageTracker.remove(topicMessageId)); } @@ -500,7 +499,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, } TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; topicToMessageIdMap.putIfAbsent(topicMessageId.getTopicPartitionName(), new ArrayList<>()); - topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId.getInnerMessageId()); + topicToMessageIdMap.get(topicMessageId.getTopicPartitionName()).add(topicMessageId); } topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> { ConsumerImpl consumer = consumers.get(topicPartitionName); @@ -547,7 +546,7 @@ public void negativeAcknowledge(MessageId messageId) { TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; ConsumerImpl consumer = consumers.get(topicMessageId.getTopicPartitionName()); - consumer.negativeAcknowledge(topicMessageId.getInnerMessageId()); + consumer.negativeAcknowledge(topicMessageId); } @Override @@ -700,12 +699,11 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { return; } removeExpiredMessagesFromQueue(messageIds); - messageIds.stream().map(messageId -> (TopicMessageIdImpl) messageId) - .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet())) - .forEach((topicName, messageIds1) -> - consumers.get(topicName) - .redeliverUnacknowledgedMessages(messageIds1.stream() - .map(mid -> mid.getInnerMessageId()).collect(Collectors.toSet()))); + messageIds.stream().collect(Collectors.groupingBy( + msgId -> ((TopicMessageIdImpl) msgId).getTopicPartitionName(), + Collectors.toSet()) + ).forEach((topicName, messageIds1) -> + consumers.get(topicName).redeliverUnacknowledgedMessages(messageIds1)); resumeReceivingFromPausedConsumersIfNeeded(); } @@ -759,14 +757,13 @@ public CompletableFuture seekAsync(Function function) { @Override public CompletableFuture seekAsync(MessageId messageId) { - MessageIdImpl targetMessageId = MessageIdImpl.convertToMessageIdImpl(messageId); - if (targetMessageId == null || isIllegalMultiTopicsMessageId(messageId)) { + if ((!(messageId instanceof PulsarApiMessageId)) || isIllegalMultiTopicsMessageId(messageId)) { return FutureUtil.failedFuture( new PulsarClientException("Illegal messageId, messageId can only be earliest/latest") ); } List> futures = new ArrayList<>(consumers.size()); - consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId))); + consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(messageId))); unAckedMessageTracker.clear(); clearIncomingMessages(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 8680f0f0e6c73..375fe0d674352 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -95,17 +96,6 @@ public synchronized void add(Message message) { } private synchronized void add(MessageId messageId, int redeliveryCount) { - if (messageId instanceof TopicMessageIdImpl) { - TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; - messageId = topicMessageId.getInnerMessageId(); - } - - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), - batchMessageId.getPartitionIndex()); - } - if (nackedMessages == null) { nackedMessages = new HashMap<>(); } @@ -116,7 +106,7 @@ private synchronized void add(MessageId messageId, int redeliveryCount) { } else { backoffNs = nackDelayNanos; } - nackedMessages.put(messageId, System.nanoTime() + backoffNs); + nackedMessages.put(MessageIdImpl.from((PulsarApiMessageId) messageId), System.nanoTime() + backoffNs); if (this.timeout == null) { // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java index 32f8fb922304b..583060303e14a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.common.api.proto.CommandAck.AckType; /** @@ -43,8 +44,9 @@ public boolean isDuplicate(MessageId messageId) { return false; } - public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties) { + @Override + public CompletableFuture addAcknowledgment(PulsarApiMessageId msgId, AckType ackType, + Map properties) { // no-op return CompletableFuture.completedFuture(null); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index fef0bcb8906f1..2d810800a6e46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -23,6 +23,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -38,13 +39,14 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; -import javax.annotation.Nullable; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.MessageIdUtils; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.protocol.Commands; @@ -79,8 +81,8 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * This is a set of all the individual acks that the application has issued and that were not already sent to * broker. */ - private final ConcurrentSkipListSet pendingIndividualAcks; - private final ConcurrentHashMap pendingIndividualBatchIndexAcks; + private final ConcurrentSkipListSet pendingIndividualAcks; + private final ConcurrentHashMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; private final boolean batchIndexAckEnabled; @@ -113,18 +115,16 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum */ @Override public boolean isDuplicate(MessageId messageId) { - if (!(messageId instanceof MessageIdImpl)) { + if (!(messageId instanceof PulsarApiMessageId)) { throw new IllegalArgumentException("isDuplicated cannot accept " + messageId.getClass().getName() + ": " + messageId); } - if (lastCumulativeAck.compareTo(messageId) >= 0) { + final PulsarApiMessageId messageIdData = (PulsarApiMessageId) messageId; + if (lastCumulativeAck.compareTo(messageIdData) >= 0) { // Already included in a cumulative ack return true; } else { - final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl) - ? ((BatchMessageIdImpl) messageId).toMessageIdImpl() - : (MessageIdImpl) messageId; - return pendingIndividualAcks.contains(messageIdImpl); + return pendingIndividualAcks.contains(MessageIdImpl.from(messageIdData)); } } @@ -134,11 +134,12 @@ public CompletableFuture addListAcknowledgment(List messageIds, if (AckType.Cumulative.equals(ackType)) { if (consumer.isAckReceiptEnabled()) { Set> completableFutureSet = new HashSet<>(); - messageIds.forEach(messageId -> - completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties))); + messageIds.forEach(messageId -> completableFutureSet.add( + addAcknowledgment((PulsarApiMessageId) messageId, ackType, properties))); return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet)); } else { - messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties)); + messageIds.forEach(messageId -> + addAcknowledgment((PulsarApiMessageId) messageId, ackType, properties)); return CompletableFuture.completedFuture(null); } } else { @@ -162,15 +163,10 @@ public CompletableFuture addListAcknowledgment(List messageIds, private void addListAcknowledgment(List messageIds) { for (MessageId messageId : messageIds) { - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(), - batchMessageId, - this::doIndividualAckAsync, - this::doIndividualBatchAckAsync); - } else if (messageId instanceof MessageIdImpl) { - addIndividualAcknowledgment((MessageIdImpl) messageId, - null, + if (messageId instanceof PulsarApiMessageId) { + PulsarApiMessageId messageIdData = (PulsarApiMessageId) messageId; + addIndividualAcknowledgment(MessageIdImpl.from(messageIdData), + messageIdData, this::doIndividualAckAsync, this::doIndividualBatchAckAsync); } else { @@ -181,35 +177,26 @@ private void addListAcknowledgment(List messageIds) { } @Override - public CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, + public CompletableFuture addAcknowledgment(PulsarApiMessageId msgId, AckType ackType, Map properties) { - if (msgId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId; - return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId); - } else { - return addAcknowledgment(msgId, ackType, properties, null); - } + return addAcknowledgment(MessageIdImpl.from(msgId), ackType, properties, msgId); } private CompletableFuture addIndividualAcknowledgment( MessageIdImpl msgId, - @Nullable BatchMessageIdImpl batchMessageId, - Function> individualAckFunction, - Function> batchAckFunction) { - if (batchMessageId != null) { - consumer.onAcknowledge(batchMessageId, null); - } else { - consumer.onAcknowledge(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackIndividual()) { - consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1); + PulsarApiMessageId msgIdData, + Function> individualAckFunction, + Function> batchAckFunction) { + consumer.onAcknowledge(msgIdData, null); + if (MessageIdUtils.acknowledge(msgIdData, true)) { + consumer.getStats().incrementNumAcksSent(msgIdData.isBatch() ? msgIdData.getBatchSize() : 1); consumer.getUnAckedMessageTracker().remove(msgId); if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId); } - return individualAckFunction.apply(msgId); + return individualAckFunction.apply(msgIdData); } else if (batchIndexAckEnabled) { - return batchAckFunction.apply(batchMessageId); + return batchAckFunction.apply(msgIdData); } else { return CompletableFuture.completedFuture(null); } @@ -218,27 +205,22 @@ private CompletableFuture addIndividualAcknowledgment( private CompletableFuture addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map properties, - @Nullable BatchMessageIdImpl batchMessageId) { + PulsarApiMessageId msgIdData) { switch (ackType) { case Individual: return addIndividualAcknowledgment(msgId, - batchMessageId, + msgIdData, __ -> doIndividualAck(__, properties), - __ -> doIndividualBatchAck(__, properties)); + msgIdData2 -> doIndividualBatchAck(msgIdData2, properties)); case Cumulative: - if (batchMessageId != null) { - consumer.onAcknowledgeCumulative(batchMessageId, null); - } else { - consumer.onAcknowledgeCumulative(msgId, null); - } - if (batchMessageId == null || batchMessageId.ackCumulative()) { + consumer.onAcknowledgeCumulative(msgIdData, null); + if (MessageIdUtils.acknowledge(msgIdData, false)) { return doCumulativeAck(msgId, properties, null); } else if (batchIndexAckEnabled) { - return doCumulativeBatchIndexAck(batchMessageId, properties); + return doCumulativeBatchIndexAck(msgIdData, properties); } else { - if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) { - doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null); - batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true); + if (((PreviousMessageAcknowledger) msgIdData).canAckPreviousMessage()) { + doCumulativeAck(MessageIdImpl.prevMessageId(msgIdData), properties, null); } return CompletableFuture.completedFuture(null); } @@ -247,7 +229,7 @@ private CompletableFuture addAcknowledgment(MessageIdImpl msgId, } } - private CompletableFuture doIndividualAck(MessageIdImpl messageId, Map properties) { + private CompletableFuture doIndividualAck(PulsarApiMessageId messageId, Map properties) { if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an // uncommon condition since it's only used for the compaction subscription. @@ -267,26 +249,27 @@ private CompletableFuture doIndividualAck(MessageIdImpl messageId, Map doIndividualAckAsync(MessageIdImpl messageId) { - pendingIndividualAcks.add(messageId); - pendingIndividualBatchIndexAcks.remove(messageId); + private CompletableFuture doIndividualAckAsync(PulsarApiMessageId messageId) { + MessageIdImpl messageIdImpl = MessageIdImpl.from(messageId); + pendingIndividualAcks.add(messageIdImpl); + pendingIndividualBatchIndexAcks.remove(messageIdImpl); return CompletableFuture.completedFuture(null); } - private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMessageId, + private CompletableFuture doIndividualBatchAck(PulsarApiMessageId msgIdData, Map properties) { if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { - return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), - batchMessageId.getBatchSize(), AckType.Individual, properties); + return doImmediateBatchIndexAck(msgIdData, msgIdData.getBatchIndex(), + msgIdData.getBatchSize(), AckType.Individual, properties); } else { - return doIndividualBatchAck(batchMessageId); + return doIndividualBatchAck(msgIdData); } } - private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMessageId) { + private CompletableFuture doIndividualBatchAck(PulsarApiMessageId msgIdData) { Optional readLock = acquireReadLock(); try { - doIndividualBatchAckAsync(batchMessageId); + doIndividualBatchAckAsync(msgIdData); return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null)); } finally { readLock.ifPresent(Lock::unlock); @@ -296,7 +279,7 @@ private CompletableFuture doIndividualBatchAck(BatchMessageIdImpl batchMes } } - private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map properties, + private CompletableFuture doCumulativeAck(PulsarApiMessageId messageId, Map properties, BitSetRecyclable bitSet) { consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { @@ -314,43 +297,43 @@ private CompletableFuture doCumulativeAck(MessageIdImpl messageId, Map doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) { + private CompletableFuture doIndividualBatchAckAsync(PulsarApiMessageId msgIdData) { ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( - batchMessageId.toMessageIdImpl(), __ -> { + MessageIdImpl.from(msgIdData), __ -> { + BitSet ackSet = msgIdData.getAckSet(); ConcurrentBitSetRecyclable value; - if (batchMessageId.getAcker() != null - && !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) { - value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet()); + if (ackSet != null) { + value = ConcurrentBitSetRecyclable.create(ackSet); } else { value = ConcurrentBitSetRecyclable.create(); - value.set(0, batchMessageId.getOriginalBatchSize()); + value.set(0, msgIdData.getBatchSize()); } return value; }); - bitSet.clear(batchMessageId.getBatchIndex()); + bitSet.clear(msgIdData.getBatchIndex()); return CompletableFuture.completedFuture(null); } - private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) { + private void doCumulativeAckAsync(PulsarApiMessageId msgId, BitSetRecyclable bitSet) { // Handle concurrent updates from different threads lastCumulativeAck.update(msgId, bitSet); } - private CompletableFuture doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId, + private CompletableFuture doCumulativeBatchIndexAck(PulsarApiMessageId msgIdData, Map properties) { if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { - return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), - batchMessageId.getBatchSize(), AckType.Cumulative, properties); + return doImmediateBatchIndexAck(msgIdData, msgIdData.getBatchIndex(), + msgIdData.getBatchSize(), AckType.Cumulative, properties); } else { BitSetRecyclable bitSet = BitSetRecyclable.create(); - bitSet.set(0, batchMessageId.getBatchSize()); - bitSet.clear(0, batchMessageId.getBatchIndex() + 1); - return doCumulativeAck(batchMessageId, null, bitSet); + bitSet.set(0, msgIdData.getBatchSize()); + bitSet.clear(0, msgIdData.getBatchIndex() + 1); + return doCumulativeAck(msgIdData, null, bitSet); } } - private CompletableFuture doImmediateAck(MessageIdImpl msgId, AckType ackType, Map properties, - BitSetRecyclable bitSet) { + private CompletableFuture doImmediateAck(PulsarApiMessageId msgId, AckType ackType, + Map properties, BitSetRecyclable bitSet) { ClientCnx cnx = consumer.getClientCnx(); if (cnx == null) { @@ -360,7 +343,8 @@ private CompletableFuture doImmediateAck(MessageIdImpl msgId, AckType ackT return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, ackType, properties, cnx); } - private CompletableFuture doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, + private CompletableFuture doImmediateBatchIndexAck(PulsarApiMessageId msgIdData, int batchIndex, + int batchSize, AckType ackType, Map properties) { ClientCnx cnx = consumer.getClientCnx(); @@ -369,8 +353,8 @@ private CompletableFuture doImmediateBatchIndexAck(BatchMessageIdImpl msgI .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); } BitSetRecyclable bitSet; - if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) { - bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray()); + if (msgIdData.getAckSet() != null) { + bitSet = BitSetRecyclable.valueOf(msgIdData.getAckSet().toLongArray()); } else { bitSet = BitSetRecyclable.create(); bitSet.set(0, batchSize); @@ -382,7 +366,8 @@ private CompletableFuture doImmediateBatchIndexAck(BatchMessageIdImpl msgI } CompletableFuture completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, - msgId.ledgerId, msgId.entryId, bitSet, ackType, properties, true, null, null); + msgIdData.getLedgerId(), msgIdData.getEntryId(), + bitSet, ackType, properties, true, null, null); bitSet.recycle(); return completableFuture; } @@ -414,7 +399,7 @@ private void flushAsync(ClientCnx cnx) { boolean shouldFlush = false; if (lastCumulativeAckToFlush != null) { shouldFlush = true; - final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId(); + final PulsarApiMessageId messageId = lastCumulativeAckToFlush.getMessageId(); newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, Collections.emptyMap(), false, @@ -429,7 +414,7 @@ private void flushAsync(ClientCnx cnx) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { // We can send 1 single protobuf command with all individual acks while (true) { - MessageIdImpl msgId = pendingIndividualAcks.pollFirst(); + PulsarApiMessageId msgId = pendingIndividualAcks.pollFirst(); if (msgId == null) { break; } @@ -452,7 +437,7 @@ private void flushAsync(ClientCnx cnx) { } else { // When talking to older brokers, send the acknowledgements individually while (true) { - MessageIdImpl msgId = pendingIndividualAcks.pollFirst(); + PulsarApiMessageId msgId = pendingIndividualAcks.pollFirst(); if (msgId == null) { break; } @@ -465,12 +450,13 @@ private void flushAsync(ClientCnx cnx) { } if (!pendingIndividualBatchIndexAcks.isEmpty()) { - Iterator> iterator = + Iterator> iterator = pendingIndividualBatchIndexAcks.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue())); + Map.Entry entry = iterator.next(); + entriesToAck.add(Triple.of( + entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue())); iterator.remove(); } } @@ -509,7 +495,7 @@ public void close() { } } - private CompletableFuture newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId, + private CompletableFuture newImmediateAckAndFlush(long consumerId, PulsarApiMessageId msgId, BitSetRecyclable bitSet, AckType ackType, Map map, ClientCnx cnx) { MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId); @@ -535,7 +521,8 @@ private CompletableFuture newImmediateAckAndFlush(long consumerId, Message completableFuture = CompletableFuture.completedFuture(null); } } else { - completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, msgId.ledgerId, msgId.getEntryId(), + completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, + msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, map, true, null, null); } return completableFuture; @@ -623,11 +610,11 @@ protected LastCumulativeAck initialValue() { }; public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest; - private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID; + private volatile PulsarApiMessageId messageId = DEFAULT_MESSAGE_ID; private BitSetRecyclable bitSetRecyclable = null; private boolean flushRequired = false; - public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + public synchronized void update(final PulsarApiMessageId messageId, final BitSetRecyclable bitSetRecyclable) { if (compareTo(messageId) < 0) { if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) { this.bitSetRecyclable.recycle(); @@ -662,25 +649,11 @@ public synchronized void reset() { flushRequired = false; } - public synchronized int compareTo(MessageId messageId) { - if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) { - final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId; - final MessageIdImpl rhs = (MessageIdImpl) messageId; - return MessageIdImpl.messageIdCompare( - lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(), - rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE); - } else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){ - final MessageIdImpl lhs = this.messageId; - final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId; - return MessageIdImpl.messageIdCompare( - lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE, - rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex()); - } else { - return this.messageId.compareTo(messageId); - } + public synchronized int compareTo(PulsarApiMessageId messageId) { + return PulsarApiMessageId.compare(this.messageId, messageId); } - private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + private synchronized void set(final PulsarApiMessageId messageId, final BitSetRecyclable bitSetRecyclable) { this.messageId = messageId; this.bitSetRecyclable = bitSetRecyclable; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index ef3ead790bc61..8742b9e7a680a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -68,6 +68,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.CryptoException; import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; @@ -837,11 +838,12 @@ protected ByteBuf encryptMessage(MessageMetadata msgMetadata, ByteBuf compressed protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageId messageId, MessageMetadata msgMetadata, ByteBuf compressedPayload) { - if (messageId instanceof MessageIdImpl) { + if (messageId instanceof PulsarApiMessageId) { + PulsarApiMessageId msgId = (PulsarApiMessageId) messageId; return Commands.newSend(producerId, sequenceId, numMessages, getChecksumType(), - ((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId(), - msgMetadata, compressedPayload); + msgId.getLedgerId(), msgId.getEntryId(), msgMetadata, compressedPayload); } else { + // NOTE: The message id could be set in the replicator, in this case, messageId is not a PulsarApiMessageId return Commands.newSend(producerId, sequenceId, numMessages, getChecksumType(), -1, -1, msgMetadata, compressedPayload); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java index 06f79024c4f24..a9fb619cde203 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ResetCursorData.java @@ -22,6 +22,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; @Data @NoArgsConstructor @@ -67,20 +68,14 @@ private ResetCursorData(String position) { } public ResetCursorData(MessageId messageId) { - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - this.ledgerId = batchMessageId.getLedgerId(); - this.entryId = batchMessageId.getEntryId(); - this.batchIndex = batchMessageId.getBatchIndex(); - this.partitionIndex = batchMessageId.partitionIndex; - } else if (messageId instanceof MessageIdImpl) { - MessageIdImpl messageIdImpl = (MessageIdImpl) messageId; - this.ledgerId = messageIdImpl.getLedgerId(); - this.entryId = messageIdImpl.getEntryId(); - this.partitionIndex = messageIdImpl.partitionIndex; - } else if (messageId instanceof TopicMessageIdImpl) { + PulsarApiMessageId msgIdData = (PulsarApiMessageId) messageId; + if (messageId instanceof TopicMessageIdImpl) { throw new IllegalArgumentException("Not supported operation on partitioned-topic"); } + this.ledgerId = msgIdData.getLedgerId(); + this.entryId = msgIdData.getEntryId(); + this.partitionIndex = msgIdData.getPartition(); + this.batchIndex = msgIdData.getBatchIndex(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index c20960950d54e..f52f0df549477 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,16 +18,19 @@ */ package org.apache.pulsar.client.impl; +import java.util.BitSet; +import javax.annotation.Nullable; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; -public class TopicMessageIdImpl implements MessageId { +public class TopicMessageIdImpl implements PulsarApiMessageId, PreviousMessageAcknowledger { /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ private final String topicPartitionName; private final String topicName; - private final MessageId messageId; + private final PulsarApiMessageId messageId; - public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { + public TopicMessageIdImpl(String topicPartitionName, String topicName, PulsarApiMessageId messageId) { this.messageId = messageId; this.topicPartitionName = topicPartitionName; this.topicName = topicName; @@ -49,6 +52,7 @@ public String getTopicPartitionName() { return this.topicPartitionName; } + @Deprecated public MessageId getInnerMessageId() { return messageId; } @@ -77,4 +81,45 @@ public boolean equals(Object obj) { public int compareTo(MessageId o) { return messageId.compareTo(o); } + + @Override + public long getLedgerId() { + return messageId.getLedgerId(); + } + + @Override + public long getEntryId() { + return messageId.getEntryId(); + } + + @Override + public int getPartition() { + return messageId.getPartition(); + } + + @Override + public int getBatchIndex() { + return messageId.getBatchIndex(); + } + + @Override + public BitSet getAckSet() { + return messageId.getAckSet(); + } + + @Override + public int getBatchSize() { + return messageId.getBatchSize(); + } + + @Nullable + @Override + public PulsarApiMessageId getFirstChunkMessageId() { + return messageId.getFirstChunkMessageId(); + } + + @Override + public boolean canAckPreviousMessage() { + return ((PreviousMessageAcknowledger) messageId).canAckPreviousMessage(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index f6c33cc930ff6..cf0d9111eacdd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.EncryptionContext; @@ -43,7 +44,8 @@ public class TopicMessageImpl implements Message { this.receivedByconsumer = receivedByConsumer; this.msg = msg; - this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, + (PulsarApiMessageId) msg.getMessageId()); } /** @@ -68,6 +70,7 @@ public MessageId getMessageId() { return messageId; } + @Deprecated public MessageId getInnerMessageId() { return messageId.getInnerMessageId(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java index 60cdad8e77200..7d2ba83e47490 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java @@ -18,10 +18,27 @@ */ package org.apache.pulsar.client.util; +import java.util.BitSet; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.impl.MessageIdImpl; public class MessageIdUtils { + + public static boolean acknowledge(PulsarApiMessageId msgId, boolean individual) { + BitSet ackSet = msgId.getAckSet(); + int batchIndex = msgId.getBatchIndex(); + if (ackSet == null || batchIndex < 0) { + return true; + } + if (individual) { + ackSet.clear(batchIndex); + } else { + ackSet.clear(0, batchIndex + 1); + } + return ackSet.isEmpty(); + } + public static final long getOffset(MessageId messageId) { MessageIdImpl msgId = (MessageIdImpl) messageId; long ledgerId = msgId.getLedgerId(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index ddca6951e49e1..8edb2f1ec8bd5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -37,6 +37,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -61,7 +63,7 @@ public void setup() throws NoSuchFieldException, IllegalAccessException { eventLoopGroup = new NioEventLoopGroup(1); consumer = mock(ConsumerImpl.class); consumer.unAckedChunkedMessageIdSequenceMap = - ConcurrentOpenHashMap.newBuilder().build(); + ConcurrentOpenHashMap.newBuilder().build(); cnx = spy(new ClientCnxTest(new ClientConfigurationData(), eventLoopGroup)); PulsarClientImpl client = mock(PulsarClientImpl.class); doReturn(client).when(consumer).getClient(); @@ -391,14 +393,14 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception public void testDoIndividualBatchAckAsync() throws Exception{ ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); - MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE); + MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, (BitSet) null); BitSet bitSet = new BitSet(20); for(int i = 0; i < 20; i ++) { bitSet.set(i, true); } - MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet)); + MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, bitSet); Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class - .getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class); + .getDeclaredMethod("doIndividualBatchAckAsync", PulsarApiMessageId.class); doIndividualBatchAckAsync.setAccessible(true); doIndividualBatchAckAsync.invoke(tracker, messageId1); doIndividualBatchAckAsync.invoke(tracker, messageId2); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java index 6bf9cd943483f..5aa1dcff2e618 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageIdImplTest.java @@ -20,7 +20,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; @@ -133,10 +132,8 @@ public void deserializationTest() { try { writer.writeValueAsString(batchMsgId); - fail("Shouldn't be deserialized"); } catch (JsonProcessingException e) { - // expected - assertTrue(e.getCause() instanceof NullPointerException); + fail("Should be successful"); } // use the default BatchMessageAckerDisabled diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java index 7f029635241de..d9f91c5e68143 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdSerializationTest.java @@ -20,6 +20,8 @@ import static org.testng.Assert.assertEquals; import java.io.IOException; +import java.util.BitSet; + import org.apache.pulsar.client.api.MessageId; import org.testng.annotations.Test; @@ -44,7 +46,7 @@ public void testProtobufSerialization2() throws Exception { @Test public void testBatchSizeNotSet() throws Exception { MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1, - BatchMessageAckerDisabled.INSTANCE); + (BitSet) null); byte[] serialized = id.toByteArray(); assertEquals(MessageId.fromByteArray(serialized), id); assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), id); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/PulsarApiMessageId.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/PulsarApiMessageId.java new file mode 100644 index 0000000000000..81429ffc7aa6a --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/PulsarApiMessageId.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import com.google.common.collect.ComparisonChain; +import java.util.BitSet; +import javax.annotation.Nullable; + +/** + * The interface to retrieve any field of {@link org.apache.pulsar.common.api.proto.MessageIdData}. + *

+ * See the MessageIdData defined in `PulsarApi.proto`. + */ +public interface PulsarApiMessageId extends MessageId { + + long getLedgerId(); + + long getEntryId(); + + default int getPartition() { + return -1; + } + + default int getBatchIndex() { + return -1; + } + + default @Nullable BitSet getAckSet() { + return null; + } + + default int getBatchSize() { + return 0; + } + + default @Nullable PulsarApiMessageId getFirstChunkMessageId() { + return null; + } + + default boolean isBatch() { + return getBatchIndex() >= 0 && getBatchSize() > 0; + } + + @Override + default int compareTo(MessageId o) { + if (!(o instanceof PulsarApiMessageId)) { + throw new UnsupportedOperationException("Unknown MessageId type: " + + ((o != null) ? o.getClass().getName() : "null")); + } + return legacyCompare(this, (PulsarApiMessageId) o); + } + + // The legacy compare method, which treats the non-batched message id as preceding the batched message id. + // However, this behavior is wrong because a non-batched message id represents an entry, while a batched message + // represents a single message in the entry, which should precedes the message id. + // Keep this implementation just for backward compatibility when users compare two message ids. + static int legacyCompare(PulsarApiMessageId lhs, PulsarApiMessageId rhs) { + return ComparisonChain.start() + .compare(lhs.getLedgerId(), rhs.getLedgerId()) + .compare(lhs.getEntryId(), rhs.getEntryId()) + .compare(lhs.getPartition(), rhs.getPartition()) + .compare(lhs.getBatchIndex(), rhs.getBatchIndex()) + .result(); + } + + static int compare(PulsarApiMessageId lhs, PulsarApiMessageId rhs) { + return ComparisonChain.start() + .compare(lhs.getLedgerId(), rhs.getLedgerId()) + .compare(lhs.getEntryId(), rhs.getEntryId()) + .compare(lhs.getPartition(), rhs.getPartition()) + .compare( + (lhs.getBatchIndex() < 0) ? Integer.MAX_VALUE : lhs.getBatchIndex(), + (rhs.getBatchIndex() < 0) ? Integer.MAX_VALUE : rhs.getBatchIndex()) + .result(); + } + + static boolean equals(PulsarApiMessageId lhs, PulsarApiMessageId rhs) { + return legacyCompare(lhs, rhs) == 0; + } + + static int hashCode(PulsarApiMessageId id) { + return (int) (31 * (id.getLedgerId() + 31 * id.getEntryId()) + (31 * (long) id.getPartition()) + + id.getBatchIndex()); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java new file mode 100644 index 0000000000000..9a7bba8a8d257 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * The additional classes to the pulsar-client-api module. + */ +package org.apache.pulsar.client.api; diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 96f6f4708a7f1..985d5677a75b1 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -45,9 +45,9 @@ import net.jodah.typetools.TypeResolver; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarApiMessageId; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; import org.apache.pulsar.common.nar.NarClassLoader; @@ -315,11 +315,8 @@ public static String getFullyQualifiedInstanceId(String tenant, String namespace } public static final long getSequenceId(MessageId messageId) { - MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl) - ? ((TopicMessageIdImpl) messageId).getInnerMessageId() - : messageId); - long ledgerId = msgId.getLedgerId(); - long entryId = msgId.getEntryId(); + long ledgerId = ((PulsarApiMessageId) messageId).getLedgerId(); + long entryId = ((PulsarApiMessageId) messageId).getEntryId(); // Combine ledger id and entry id to form offset // Use less than 32 bits to represent entry id since it will get diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 37d0987e61023..171f2542a51a5 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -26,6 +26,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -365,6 +366,23 @@ static class BatchMessageSequenceRef { int batchIdx; } + private static Method getMethodOfMessageId(MessageId messageId, String name) throws NoSuchMethodException { + Class clazz = messageId.getClass(); + NoSuchMethodException firstException = null; + while (clazz != null) { + try { + return clazz.getDeclaredMethod(name); + } catch (NoSuchMethodException e) { + if (firstException == null) { + firstException = e; + } + clazz = clazz.getSuperclass(); + } + } + assert firstException != null; + throw firstException; + } + @VisibleForTesting static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId messageId) { long ledgerId; @@ -372,23 +390,17 @@ static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId me int batchIdx; try { try { - messageId = (MessageId) messageId.getClass().getDeclaredMethod("getInnerMessageId").invoke(messageId); - } catch (NoSuchMethodException noSuchMethodException) { - // not a TopicMessageIdImpl - } - - try { - batchIdx = (int) messageId.getClass().getDeclaredMethod("getBatchIndex").invoke(messageId); + batchIdx = (int) getMethodOfMessageId(messageId, "getBatchIndex").invoke(messageId); + if (batchIdx < 0) { + return null; + } } catch (NoSuchMethodException noSuchMethodException) { // not a BatchMessageIdImpl, returning null to use the standard sequenceId return null; } - // if getBatchIndex exists it means messageId is a 'BatchMessageIdImpl' instance. - final Class messageIdImplClass = messageId.getClass().getSuperclass(); - - ledgerId = (long) messageIdImplClass.getDeclaredMethod("getLedgerId").invoke(messageId); - entryId = (long) messageIdImplClass.getDeclaredMethod("getEntryId").invoke(messageId); + ledgerId = (long) getMethodOfMessageId(messageId, "getLedgerId").invoke(messageId); + entryId = (long) getMethodOfMessageId(messageId, "getEntryId").invoke(messageId); } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) { log.error("Unexpected error while retrieving sequenceId, messageId class: {}, error: {}", messageId.getClass().getName(), ex.getMessage(), ex);