diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b75a7d771b..031e0a3a6d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -185,7 +185,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.log.Info("Created consumer") pc.setConsumerState(consumerReady) - if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { + if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) { msgID, err := pc.requestGetLastMessageID() if err != nil { pc.nackTracker.Close() @@ -194,7 +194,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if msgID.entryID != noMessageEntry { pc.startMessageID = msgID - err = pc.requestSeek(msgID.messageID) + // use the WithoutClear version because the dispatcher is not started yet + err = pc.requestSeekWithoutClear(msgID.messageID) if err != nil { pc.nackTracker.Close() return nil, err @@ -369,8 +370,15 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) { defer close(seek.doneCh) seek.err = pc.requestSeek(seek.msgID.messageID) } - func (pc *partitionConsumer) requestSeek(msgID messageID) error { + if err := pc.requestSeekWithoutClear(msgID); err != nil { + return err + } + pc.clearMessageChannels() + return nil +} + +func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error { state := pc.getConsumerState() if state == consumerClosing || state == consumerClosed { pc.log.WithField("state", state).Error("Consumer is closing or has closed") @@ -396,7 +404,6 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) error { pc.log.WithError(err).Error("Failed to reset to message id") return err } - pc.clearMessageChannels() return nil } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index d76865ef86..79164ffbb0 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -165,6 +165,9 @@ func (r *reader) hasMoreMessages() bool { } if r.pc.options.startMessageIDInclusive { + if r.pc.startMessageID.messageID.equal(lastestMessageID.(messageID)) { + return r.lastMessageInBroker.isEntryIDValid() + } return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID) } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 36204cf284..cd97964a07 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -577,10 +577,12 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.Nil(t, err) defer reader.Close() - if reader.HasNext() { - msg, err := reader.Next(context.Background()) - assert.NoError(t, err) + assert.True(t, reader.HasNext()) + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) - assert.Equal(t, []byte("hello-9"), msg.Payload()) - } + assert.Equal(t, []byte("hello-9"), msg.Payload()) + assert.Equal(t, lastMsgID.Serialize(), msg.ID().Serialize()) + + assert.False(t, reader.HasNext()) }