-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix MessageIdUtils cannot handle TopicMessageId #22698
Conversation
@@ -19,11 +19,12 @@ | |||
package org.apache.pulsar.client.util; | |||
|
|||
import org.apache.pulsar.client.api.MessageId; | |||
import org.apache.pulsar.client.api.MessageIdAdv; | |||
import org.apache.pulsar.client.impl.MessageIdImpl; | |||
|
|||
public class MessageIdUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This util class was introduced at very early era. It's better to add the util function to MessageIdAdvUtils
for MessageIdAdv
.
And I don't think it's worth providing a util function for MessageIdAdv
in the Pulsar core repo. MessageIdAdv
is introduced mainly for this reason. The downstream application should cast MessageId
to MessageIdAdv
to access the internal fields. It should not depend on the wrapped APIs from the upstream.
Pulsar does not have the "offset" concept, the same logic is only used in the Kafka sink connector, but it does not call this util method directly. See
Line 376 in e558cfe
long offset = (ledgerId << 28) | entryId; |
To get the offset, it's better to leverage the AppendIndexMetadataInterceptor
like KoP. See the discussion here: streamnative/kop#290
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also used in third-party connectors, not just limited to the kafka sink connector.
Removing the offset
from the pulsar-client looks good to me. I think we can mark it with Deprecate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Before removing it, we can mark it with @Deprecated
.
In 3rd party applications, it's easy to customize its own implementation via MessageIdAdv
. You can see the example from an early implementation of KoP: https://github.com/streamnative/kop/blob/branch-2.7.4.5/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageIdUtils.java, which is also an example to show the way to compute the offset is not standard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To leave our discussion context here, I created a new PR to deprecate these APIs: #22747.
There is the same offset implementation in the FunctionCommon
. I don't deprecate them currently because the connector still needs to use it.
…Utils.getMessageId` (#22747) ### Motivation After discussing [here](#22698 (comment)), the pulsar client shouldn't expose the `offset` term to users. ### Modifications - Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` - For connectors, use `FunctionCommon.getOffset` and `FunctionCommon.getMessageId`
Motivation
PIP-229 introduces an interface change for the messageId which break the behavior in and after Pulsar 3.0.
This leads to the
MessageIdUtils
not be able to handle theTopicMessageId
. It will throw error like:Modifications
Verifying this change
This change added tests.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: