Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix MessageIdUtils cannot handle TopicMessageId #22698

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.pulsar.client.util;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.MessageIdImpl;

public class MessageIdUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This util class was introduced at very early era. It's better to add the util function to MessageIdAdvUtils for MessageIdAdv.

And I don't think it's worth providing a util function for MessageIdAdv in the Pulsar core repo. MessageIdAdv is introduced mainly for this reason. The downstream application should cast MessageId to MessageIdAdv to access the internal fields. It should not depend on the wrapped APIs from the upstream.

Pulsar does not have the "offset" concept, the same logic is only used in the Kafka sink connector, but it does not call this util method directly. See

To get the offset, it's better to leverage the AppendIndexMetadataInterceptor like KoP. See the discussion here: streamnative/kop#290

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also used in third-party connectors, not just limited to the kafka sink connector.
Removing the offset from the pulsar-client looks good to me. I think we can mark it with Deprecate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Before removing it, we can mark it with @Deprecated.

In 3rd party applications, it's easy to customize its own implementation via MessageIdAdv. You can see the example from an early implementation of KoP: https://github.com/streamnative/kop/blob/branch-2.7.4.5/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageIdUtils.java, which is also an example to show the way to compute the offset is not standard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To leave our discussion context here, I created a new PR to deprecate these APIs: #22747.
There is the same offset implementation in the FunctionCommon. I don't deprecate them currently because the connector still needs to use it.

public static final long getOffset(MessageId messageId) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
MessageIdAdv msgId = (MessageIdAdv) messageId;
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.util;

import static org.testng.Assert.assertEquals;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.testng.annotations.Test;

public class MessageIdUtilsTest {
@Test
public void testTopicMessageIdGetOffset() {
MessageIdImpl msgId = new MessageIdImpl(1, 2, 3);
TopicMessageIdImpl topicMsgId = new TopicMessageIdImpl("topic", msgId);
long offset = MessageIdUtils.getOffset(topicMsgId);
assertEquals(offset, 268435458L);
}
}
Loading