From 7be139d6af67999f8ee7d786ebea22fe6be6bd2e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 19 Apr 2023 21:41:17 +0800 Subject: [PATCH] [fix][client] Move MessageIdAdv to the pulsar-common module ### Motivation https://github.com/apache/pulsar/pull/19414 does not follow the design of https://github.com/apache/pulsar/issues/18950 > Since the aimed developers are Pulsar core developers, it's added in > the pulsar-common module (PulsarApi.proto is also in this module), not > the pulsar-client-api module. The reason is that `TopicMessageId#create` now cannot be a `MessageIdAdv` if `MessageIdAdv` is not in the `pulsar-client-api` module. ### Modifications - Move the `MessageIdAdv` class to the `pulsar-common` module. - Create a `TopicMessageIdImpl` instance for `TopicMessageId#create` via the `DefaultImplementation` class with the overhead of reflection. --- .../pulsar/client/api/TopicMessageId.java | 82 +------------------ .../PulsarClientImplementationBinding.java | 3 + .../pulsar/client/impl/ConsumerImpl.java | 2 +- ...PulsarClientImplementationBindingImpl.java | 18 +++- .../client/impl/TopicMessageIdImpl.java | 63 +++++++++++++- .../pulsar/client/api/MessageIdAdv.java | 0 .../pulsar/client/api/package-info.java | 22 +++++ .../pulsar/websocket/ConsumerHandler.java | 6 +- 8 files changed, 109 insertions(+), 87 deletions(-) rename {pulsar-client-api => pulsar-common}/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java (100%) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java index b70267bb0fb8b1..4d02a7f4096d67 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicMessageId.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.client.api; -import java.util.BitSet; +import org.apache.pulsar.client.internal.DefaultImplementation; /** * The MessageId used for a consumer that subscribes multiple topics or partitioned topics. @@ -45,84 +45,6 @@ static TopicMessageId create(String topic, MessageId messageId) { if (messageId instanceof TopicMessageId) { return (TopicMessageId) messageId; } - return new Impl(topic, messageId); - } - - /** - * The simplest implementation of a TopicMessageId interface. - */ - class Impl implements MessageIdAdv, TopicMessageId { - private final String topic; - private final MessageIdAdv messageId; - - public Impl(String topic, MessageId messageId) { - this.topic = topic; - this.messageId = (MessageIdAdv) messageId; - } - - @Override - public byte[] toByteArray() { - return messageId.toByteArray(); - } - - @Override - public String getOwnerTopic() { - return topic; - } - - @Override - public long getLedgerId() { - return messageId.getLedgerId(); - } - - @Override - public long getEntryId() { - return messageId.getEntryId(); - } - - @Override - public int getPartitionIndex() { - return messageId.getPartitionIndex(); - } - - @Override - public int getBatchIndex() { - return messageId.getBatchIndex(); - } - - @Override - public int getBatchSize() { - return messageId.getBatchSize(); - } - - @Override - public BitSet getAckSet() { - return messageId.getAckSet(); - } - - @Override - public MessageIdAdv getFirstChunkMessageId() { - return messageId.getFirstChunkMessageId(); - } - - @Override - public int compareTo(MessageId o) { - return messageId.compareTo(o); - } - - @Override - public boolean equals(Object obj) { - return messageId.equals(obj); - } - - @Override - public int hashCode() { - return messageId.hashCode(); - } - - @Override - public String toString() { - return messageId.toString(); - } + return DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, messageId); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index 875a7930235235..8fd05bff265f1f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessagePayloadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; @@ -252,4 +253,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) { SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp, Map propertiesValue); + + TopicMessageId newTopicMessageId(String topic, MessageId messageId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index cc016093196985..199e8a9ae71b46 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2345,7 +2345,7 @@ public CompletableFuture getLastMessageIdAsync() { @Override public CompletableFuture> getLastMessageIdsAsync() { return getLastMessageIdAsync() - .thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId))); + .thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId))); } public CompletableFuture internalGetLastMessageIdAsync() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 1b069c5172dd7c..346eb20ef4cc5d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -35,9 +34,11 @@ import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.MessagePayloadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; @@ -387,4 +388,19 @@ public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map propertiesValue) { return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue); } + + @Override + public TopicMessageId newTopicMessageId(String topic, MessageId messageId) { + final MessageIdAdv messageIdAdv; + if (messageId instanceof MessageIdAdv) { + messageIdAdv = (MessageIdAdv) messageId; + } else { + try { + messageIdAdv = (MessageIdAdv) MessageId.fromByteArray(messageId.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return new TopicMessageIdImpl(topic, messageIdAdv); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 189dc1c608379b..d99e8a53316d35 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,15 +18,27 @@ */ package org.apache.pulsar.client.impl; +import java.util.BitSet; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.TopicMessageId; -public class TopicMessageIdImpl extends TopicMessageId.Impl { +public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId { - private final String topicName; + private final String ownerTopic; + private final MessageIdAdv msgId; + private final String topicName; // it's never used + public TopicMessageIdImpl(String topic, MessageIdAdv msgId) { + this.ownerTopic = topic; + this.msgId = msgId; + this.topicName = ""; + } + + @Deprecated public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { - super(topicPartitionName, messageId); + this.msgId = (MessageIdAdv) messageId; + this.ownerTopic = topicPartitionName; this.topicName = topicName; } @@ -62,4 +74,49 @@ public boolean equals(Object obj) { public int hashCode() { return super.hashCode(); } + + @Override + public byte[] toByteArray() { + return msgId.toByteArray(); + } + + @Override + public String getOwnerTopic() { + return ownerTopic; + } + + @Override + public long getLedgerId() { + return msgId.getLedgerId(); + } + + @Override + public long getEntryId() { + return msgId.getEntryId(); + } + + @Override + public int getPartitionIndex() { + return msgId.getPartitionIndex(); + } + + @Override + public int getBatchIndex() { + return msgId.getBatchIndex(); + } + + @Override + public int getBatchSize() { + return msgId.getBatchSize(); + } + + @Override + public BitSet getAckSet() { + return msgId.getAckSet(); + } + + @Override + public MessageIdAdv getFirstChunkMessageId() { + return msgId.getFirstChunkMessageId(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java similarity index 100% rename from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java rename to pulsar-common/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java new file mode 100644 index 00000000000000..3f6d1d56e10320 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Additional helper classes to the pulsar-client-api module. + */ +package org.apache.pulsar.client.api; diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 579b4233399119..c988fd1e70ce32 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; @@ -45,6 +46,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.websocket.data.ConsumerCommand; @@ -293,8 +295,8 @@ private void checkResumeReceive() { private void handleAck(ConsumerCommand command) throws IOException { // We should have received an ack - TopicMessageId msgId = TopicMessageId.create(topic.toString(), - MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId))); + TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(), + (MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId))); if (log.isDebugEnabled()) { log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), subscription, msgId, getRemote().getInetSocketAddress().toString());