Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix seek chunked messages #205

Merged
merged 3 commits into from
Mar 9, 2022
Merged

Fix seek chunked messages #205

merged 3 commits into from
Mar 9, 2022

Conversation

RobertIndie
Copy link
Contributor

@RobertIndie RobertIndie commented Mar 2, 2022

Motivation

This is the implementation of apache/pulsar#12402.

Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.

Here is the simple code(in java) used to demonstrate the problem.

var msgId = producer.send(...); // eg. return 0:1:-1

var otherMsg = producer.send(...); // return 0:2:-1

consumer.seek(msgId); // inclusive seek

var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
first message and return like 0:2:-1

Assert.assertEquals(msgId, receiveMsgId); // fail

For more context, please see PIP-107

And I find that f# client has already stored all chunk message ids in MessageIds.chunkMessageIds. We can use this field to implement the ChunkMessageId feature like in java.

There is still work left in this PR to serialize the ChunkMessageId. To be consistent with the behavior of the Java client, when we serialize and deserialize messageIDs or compare messageId, the comparison for chunkMessageIds only needs to compare the message id of the first chunk if the message is a chunked message. Like below:

match m.ChunkMessageIds, this.ChunkMessageIds with
| Some mchunkMessageIds, Some thisChunkMessageIds when mchunkMessageIds.Length > 0 && thisChunkMessageIds.Length > 0 ->
                        mchunkMessageIds.[0] = thisChunkMessageIds.[0] // We need to check the first chunk message id if the message is a chunkd message
| _, _ -> true

We need to update the pulsar proto file before proceeding with the rest of the work. What is the correct way to generate the code for the proto? I found that the code I generated using protoc is very different from the existing generated code. Are the parameters not set correctly?

Update: The serialization for the chunk message id is added. This PR is ready for review.

Modification

  • Fix consumer inclusive seek for chunked message
  • Add compare for the first chunk message id in MessageId.

@Lanayx
Copy link
Member

Lanayx commented Mar 2, 2022

The code is generated using this site https://protogen.marcgravell.com/ , you'll also need to update generated modifiers from public to internal.

Comment on lines 176 to 177
|> Async.AwaitTask
|> Async.RunSynchronously]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please rewrite it similar to this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find a good way to rewrite it. I got some compile errors when I tried it, Could you give me some guidance?

@Lanayx Lanayx merged commit df64c9d into fsprojects:develop Mar 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants