Skip to content

Commit

Permalink
[fix][client] Fix the startMessageId can't be respected as the ChunkM…
Browse files Browse the repository at this point in the history
…essageID (#16154)

### Motivation

This is the same problem as when the consumer inclusive seeks the chunked message.

See more detail in [PIP-107](#12402)

### Modifications

* Use the first chunk message id as the startMessageId when creating the consumer/reader.
  • Loading branch information
RobertIndie authored Jun 22, 2022
1 parent 54e9c75 commit 33cf2d0
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,15 @@ public void testSeekChunkMessages() throws PulsarClientException {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.startMessageIdInclusive()
.startMessageId(msgIds.get(1))
.create();

Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
assertEquals(msgIds.get(1), readMsg.getMessageId());

consumer1.close();
consumer2.close();
producer.close();
Expand Down Expand Up @@ -549,5 +558,5 @@ private String createMessagePayload(int size) {
}
return str.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = conf.getSubscriptionMode();
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
if (startMessageId != null) {
if (startMessageId instanceof ChunkMessageIdImpl) {
this.startMessageId = new BatchMessageIdImpl(
((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId());
} else {
this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId);
}
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
Expand Down

0 comments on commit 33cf2d0

Please sign in to comment.