Skip to content

Commit

Permalink
[fix][client] Move MessageIdAdv to the pulsar-common module
Browse files Browse the repository at this point in the history
### Motivation

apache#19414 does not follow the design
of apache#18950

> Since the aimed developers are Pulsar core developers, it's added in
> the pulsar-common module (PulsarApi.proto is also in this module), not
> the pulsar-client-api module.

The reason is that `TopicMessageId#create` now cannot be a
`MessageIdAdv` if `MessageIdAdv` is not in the `pulsar-client-api`
module.

### Modifications

- Move the `MessageIdAdv` class to the `pulsar-common` module.
- Create a `TopicMessageIdImpl` instance for `TopicMessageId#create` via
  the `DefaultImplementation` class with the overhead of reflection.
  • Loading branch information
BewareMyPower committed Apr 19, 2023
1 parent 9b72302 commit 7be139d
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.util.BitSet;
import org.apache.pulsar.client.internal.DefaultImplementation;

/**
* The MessageId used for a consumer that subscribes multiple topics or partitioned topics.
Expand All @@ -45,84 +45,6 @@ static TopicMessageId create(String topic, MessageId messageId) {
if (messageId instanceof TopicMessageId) {
return (TopicMessageId) messageId;
}
return new Impl(topic, messageId);
}

/**
* The simplest implementation of a TopicMessageId interface.
*/
class Impl implements MessageIdAdv, TopicMessageId {
private final String topic;
private final MessageIdAdv messageId;

public Impl(String topic, MessageId messageId) {
this.topic = topic;
this.messageId = (MessageIdAdv) messageId;
}

@Override
public byte[] toByteArray() {
return messageId.toByteArray();
}

@Override
public String getOwnerTopic() {
return topic;
}

@Override
public long getLedgerId() {
return messageId.getLedgerId();
}

@Override
public long getEntryId() {
return messageId.getEntryId();
}

@Override
public int getPartitionIndex() {
return messageId.getPartitionIndex();
}

@Override
public int getBatchIndex() {
return messageId.getBatchIndex();
}

@Override
public int getBatchSize() {
return messageId.getBatchSize();
}

@Override
public BitSet getAckSet() {
return messageId.getAckSet();
}

@Override
public MessageIdAdv getFirstChunkMessageId() {
return messageId.getFirstChunkMessageId();
}

@Override
public int compareTo(MessageId o) {
return messageId.compareTo(o);
}

@Override
public boolean equals(Object obj) {
return messageId.equals(obj);
}

@Override
public int hashCode() {
return messageId.hashCode();
}

@Override
public String toString() {
return messageId.toString();
}
return DefaultImplementation.getDefaultImplementation().newTopicMessageId(topic, messageId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
Expand Down Expand Up @@ -252,4 +253,6 @@ static byte[] getBytes(ByteBuffer byteBuffer) {

SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
Map<String, String> propertiesValue);

TopicMessageId newTopicMessageId(String topic, MessageId messageId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2345,7 +2345,7 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
@Override
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
return getLastMessageIdAsync()
.thenApply(msgId -> Collections.singletonList(TopicMessageId.create(topic, msgId)));
.thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
}

public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand All @@ -35,9 +34,11 @@
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
Expand Down Expand Up @@ -387,4 +388,19 @@ public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type,
Map<String, String> propertiesValue) {
return new SchemaInfoImpl(name, schema, type, timestamp, propertiesValue);
}

@Override
public TopicMessageId newTopicMessageId(String topic, MessageId messageId) {
final MessageIdAdv messageIdAdv;
if (messageId instanceof MessageIdAdv) {
messageIdAdv = (MessageIdAdv) messageId;
} else {
try {
messageIdAdv = (MessageIdAdv) MessageId.fromByteArray(messageId.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return new TopicMessageIdImpl(topic, messageIdAdv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
*/
package org.apache.pulsar.client.impl;

import java.util.BitSet;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.TopicMessageId;

public class TopicMessageIdImpl extends TopicMessageId.Impl {
public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {

private final String topicName;
private final String ownerTopic;
private final MessageIdAdv msgId;
private final String topicName; // it's never used

public TopicMessageIdImpl(String topic, MessageIdAdv msgId) {
this.ownerTopic = topic;
this.msgId = msgId;
this.topicName = "";
}

@Deprecated
public TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) {
super(topicPartitionName, messageId);
this.msgId = (MessageIdAdv) messageId;
this.ownerTopic = topicPartitionName;
this.topicName = topicName;
}

Expand Down Expand Up @@ -62,4 +74,49 @@ public boolean equals(Object obj) {
public int hashCode() {
return super.hashCode();
}

@Override
public byte[] toByteArray() {
return msgId.toByteArray();
}

@Override
public String getOwnerTopic() {
return ownerTopic;
}

@Override
public long getLedgerId() {
return msgId.getLedgerId();
}

@Override
public long getEntryId() {
return msgId.getEntryId();
}

@Override
public int getPartitionIndex() {
return msgId.getPartitionIndex();
}

@Override
public int getBatchIndex() {
return msgId.getBatchIndex();
}

@Override
public int getBatchSize() {
return msgId.getBatchSize();
}

@Override
public BitSet getAckSet() {
return msgId.getAckSet();
}

@Override
public MessageIdAdv getFirstChunkMessageId() {
return msgId.getFirstChunkMessageId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Additional helper classes to the pulsar-client-api module.
*/
package org.apache.pulsar.client.api;
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.websocket.data.ConsumerCommand;
Expand Down Expand Up @@ -293,8 +295,8 @@ private void checkResumeReceive() {

private void handleAck(ConsumerCommand command) throws IOException {
// We should have received an ack
TopicMessageId msgId = TopicMessageId.create(topic.toString(),
MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
(MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
Expand Down

0 comments on commit 7be139d

Please sign in to comment.