diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 85d67c3de0d41..00e6c2f78e317 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -520,6 +520,15 @@ public void testSeekChunkMessages() throws PulsarClientException { assertEquals(msgIds.get(i), msgAfterSeek.getMessageId()); } + Reader reader = pulsarClient.newReader() + .topic(topicName) + .startMessageIdInclusive() + .startMessageId(msgIds.get(1)) + .create(); + + Message readMsg = reader.readNext(5, TimeUnit.SECONDS); + assertEquals(msgIds.get(1), readMsg.getMessageId()); + consumer1.close(); consumer2.close(); producer.close(); @@ -549,5 +558,5 @@ private String createMessagePayload(int size) { } return str.toString(); } - + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9e86770ee8ffa..3e25ba0facbea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -249,7 +249,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat interceptors); this.consumerId = client.newConsumerId(); this.subscriptionMode = conf.getSubscriptionMode(); - this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null; + if (startMessageId != null) { + if (startMessageId instanceof ChunkMessageIdImpl) { + this.startMessageId = new BatchMessageIdImpl( + ((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId()); + } else { + this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId); + } + } this.initialStartMessageId = this.startMessageId; this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; AVAILABLE_PERMITS_UPDATER.set(this, 0);