Skip to content

Commit

Permalink
[improve][client] Deprecate MessageIdUtils.getOffset and `MessageId…
Browse files Browse the repository at this point in the history
…Utils.getMessageId` (#22747)

### Motivation

After discussing [here](#22698 (comment)),  the pulsar client shouldn't expose the `offset` term to users.

### Modifications

- Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId`
- For connectors, use `FunctionCommon.getOffset` and `FunctionCommon.getMessageId`
  • Loading branch information
RobertIndie authored May 21, 2024
1 parent b5bc390 commit 878a412
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;

public class MessageIdUtils {
@Deprecated
public static final long getOffset(MessageId messageId) {
MessageIdImpl msgId = (MessageIdImpl) messageId;
long ledgerId = msgId.getLedgerId();
Expand All @@ -34,6 +35,7 @@ public static final long getOffset(MessageId messageId) {
return offset;
}

@Deprecated
public static final MessageId getMessageId(long offset) {
// Demultiplex ledgerId and entryId from offset
long ledgerId = offset >>> 28;
Expand Down
6 changes: 6 additions & 0 deletions pulsar-io/kafka-connect-adaptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-utils</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;

@Slf4j
Expand Down Expand Up @@ -150,7 +150,7 @@ private void seekAndUpdateOffset(TopicPartition topicPartition, long offset) {
try {
ctx.seek(desanitizeTopicName.apply(topicPartition.topic()),
topicPartition.partition(),
MessageIdUtils.getMessageId(offset));
FunctionCommon.getMessageId(offset));
} catch (PulsarClientException e) {
log.error("Failed to seek topic {} partition {} offset {}",
topicPartition.topic(), topicPartition.partition(), offset, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
Expand Down Expand Up @@ -303,8 +303,8 @@ public void seekPauseResumeTest() throws Exception {
assertEquals(status.get(), 1);

final TopicPartition tp = new TopicPartition("fake-topic", 0);
assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));
assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));

sink.taskContext.offset(tp, 0);
verify(context, times(1)).seek(Mockito.anyString(), Mockito.anyInt(), any());
Expand Down Expand Up @@ -347,12 +347,12 @@ public void seekPauseResumeWithSanitizeTest() throws Exception {
assertEquals(status.get(), 1);

final TopicPartition tp = new TopicPartition(sink.sanitizeNameIfNeeded(pulsarTopicName, true), 0);
assertNotEquals(MessageIdUtils.getOffset(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), MessageIdUtils.getOffset(msgId));
assertNotEquals(FunctionCommon.getSequenceId(msgId), 0);
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), FunctionCommon.getSequenceId(msgId));

sink.taskContext.offset(tp, 0);
verify(context, times(1)).seek(pulsarTopicName,
tp.partition(), MessageIdUtils.getMessageId(0));
tp.partition(), FunctionCommon.getMessageId(0));
assertEquals(sink.currentOffset(tp.topic(), tp.partition()), 0);

sink.taskContext.pause(tp);
Expand Down

0 comments on commit 878a412

Please sign in to comment.