Skip to content

Commit

Permalink
[ISSUE-328] gets last message when LatestMessageID and inclusive (#329)
Browse files Browse the repository at this point in the history
Signed-off-by: Paulo Pereira <[email protected]>

### Motivation

I have a service that when it restarts, it needs to know what was the last message successfully sent to pulsar.
A reader seems the logical place, since we can specify `StartMessageID` as `LatestMessageID()` and `StartMessageIDInclusive`

### Modifications

When the reader is created, verify if it startMessageIDInclusive true and startMessageID == lastestMessageID() and then get the last message id and seek to that message id.
  • Loading branch information
quintans authored Aug 24, 2020
1 parent 5d57012 commit a7e7239
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 8 deletions.
43 changes: 35 additions & 8 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var (
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
})

lastestMessageID = LatestMessageID()
)

type consumerState int
Expand All @@ -98,6 +100,10 @@ const (
nonDurable
)

const (
noMessageEntry = -1
)

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -193,6 +199,21 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.log.Info("Created consumer")
pc.state = consumerReady

if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
return nil, err
}
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID

err = pc.requestSeek(msgID)
if err != nil {
return nil, err
}
}
}

go pc.dispatcher()

go pc.runEventsLoop()
Expand Down Expand Up @@ -252,7 +273,10 @@ func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
defer close(req.doneCh)
req.msgID, req.err = pc.requestGetLastMessageID()
}

func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
Expand All @@ -262,11 +286,10 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
req.err = err
} else {
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
req.msgID = convertToMessageID(id)
return messageID{}, err
}
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
return convertToMessageID(id), nil
}

func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
Expand Down Expand Up @@ -342,17 +365,20 @@ func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID)
}

func (pc *partitionConsumer) requestSeek(msgID messageID) error {
if pc.state == consumerClosing || pc.state == consumerClosed {
pc.log.Error("Consumer was already closed")
return
return nil
}

id := &pb.MessageIdData{}
err := proto.Unmarshal(seek.msgID.Serialize(), id)
err := proto.Unmarshal(msgID.Serialize(), id)
if err != nil {
pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error())
seek.err = err
return err
}

requestID := pc.client.rpcClient.NewRequestID()
Expand All @@ -365,8 +391,9 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
_, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message id")
seek.err = err
return err
}
return nil
}

func (pc *partitionConsumer) SeekByTime(time time.Time) error {
Expand Down
63 changes: 63 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,66 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
}

func TestReaderLatestInclusiveHasNext(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create reader on the last message (inclusive)
reader0, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: LatestMessageID(),
StartMessageIDInclusive: true,
})

assert.Nil(t, err)
defer reader0.Close()

assert.False(t, reader0.HasNext())

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
var lastMsgID MessageID
for i := 0; i < 10; i++ {
lastMsgID, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, lastMsgID)
}

// create reader on the last message (inclusive)
reader, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: LatestMessageID(),
StartMessageIDInclusive: true,
})

assert.Nil(t, err)
defer reader.Close()

var msgID MessageID
if reader.HasNext() {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)

assert.Equal(t, []byte("hello-9"), msg.Payload())
msgID = msg.ID()
}

assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
}

0 comments on commit a7e7239

Please sign in to comment.