Skip to content

Commit

Permalink
Fix reader with start latest message id inclusive (apache#329)
Browse files Browse the repository at this point in the history
Fixes apache#356
Fixes apache#419

Motivtions

The changes made by the original PR apache#329 are no longer works.
This PR is trying to fix the bugs and make the test case be more robust.

Moditications

* fix the wrong `pc.startMessageID == lastestMessageID` with `pc.startMessageID.equal(lastestMessageID.(messageID))`
* fix the hanging `pc.requestSeek(msgID.messageID)` with `pc.requestSeekWithoutClear(msgID.messageID)` before the dispatch loop
* make `HasNext()` return true in case of `StartMessageIdInclusive` and `LastestMessageID`
* make `TestReaderLatestInclusiveHasNext` test case be more robust
  • Loading branch information
rueian committed Feb 15, 2021
1 parent 25f3075 commit 85b0163
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
15 changes: 11 additions & 4 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.log.Info("Created consumer")
pc.setConsumerState(consumerReady)

if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
pc.nackTracker.Close()
Expand All @@ -194,7 +194,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID

err = pc.requestSeek(msgID.messageID)
// use the WithoutClear version because the dispatcher is not started yet
err = pc.requestSeekWithoutClear(msgID.messageID)
if err != nil {
pc.nackTracker.Close()
return nil, err
Expand Down Expand Up @@ -369,8 +370,15 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID.messageID)
}

func (pc *partitionConsumer) requestSeek(msgID messageID) error {
if err := pc.requestSeekWithoutClear(msgID); err != nil {
return err
}
pc.clearMessageChannels()
return nil
}

func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error {
state := pc.getConsumerState()
if state == consumerClosing || state == consumerClosed {
pc.log.WithField("state", state).Error("Consumer is closing or has closed")
Expand All @@ -396,7 +404,6 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) error {
pc.log.WithError(err).Error("Failed to reset to message id")
return err
}
pc.clearMessageChannels()
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func (r *reader) hasMoreMessages() bool {
}

if r.pc.options.startMessageIDInclusive {
if r.pc.startMessageID.messageID.equal(lastestMessageID.(messageID)) {
return r.lastMessageInBroker.isEntryIDValid()
}
return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID)
}

Expand Down
12 changes: 7 additions & 5 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,12 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
assert.Nil(t, err)
defer reader.Close()

if reader.HasNext() {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)
assert.True(t, reader.HasNext())
msg, err := reader.Next(context.Background())
assert.NoError(t, err)

assert.Equal(t, []byte("hello-9"), msg.Payload())
}
assert.Equal(t, []byte("hello-9"), msg.Payload())
assert.Equal(t, lastMsgID.Serialize(), msg.ID().Serialize())

assert.False(t, reader.HasNext())
}

0 comments on commit 85b0163

Please sign in to comment.