Skip to content

Commit

Permalink
Add support for loading non-serializable properties with ConsumerBuil…
Browse files Browse the repository at this point in the history
…der::loadConf
  • Loading branch information
cbornet committed Nov 5, 2022
1 parent a2c1534 commit 4b6b973
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,39 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {

@Override
public ConsumerBuilder<T> loadConf(Map<String, Object> config) {
this.conf = ConfigurationDataUtils.loadData(config, conf, ConsumerConfigurationData.class);
MessageListener<T> messageListener =
(MessageListener<T>) config.getOrDefault("messageListener", this.conf.getMessageListener());
ConsumerEventListener consumerEventListener =
(ConsumerEventListener) config.getOrDefault("consumerEventListener", this.conf.getConsumerEventListener());
RedeliveryBackoff negativeAckRedeliveryBackoff = (RedeliveryBackoff) config
.getOrDefault("negativeAckRedeliveryBackoff", this.conf.getNegativeAckRedeliveryBackoff());
RedeliveryBackoff ackTimeoutRedeliveryBackoff = (RedeliveryBackoff) config
.getOrDefault("ackTimeoutRedeliveryBackoff", this.conf.getAckTimeoutRedeliveryBackoff());
CryptoKeyReader cryptoKeyReader =
(CryptoKeyReader) config.getOrDefault("cryptoKeyReader", this.conf.getCryptoKeyReader());
MessageCrypto messageCrypto =
(MessageCrypto) config.getOrDefault("messageCrypto", this.conf.getMessageCrypto());
BatchReceivePolicy batchReceivePolicy =
(BatchReceivePolicy) config.getOrDefault("batchReceivePolicy", this.conf.getBatchReceivePolicy());
KeySharedPolicy keySharedPolicy =
(KeySharedPolicy) config.getOrDefault("keySharedPolicy", this.conf.getKeySharedPolicy());
MessagePayloadProcessor payloadProcessor =
(MessagePayloadProcessor) config.getOrDefault("payloadProcessor", this.conf.getPayloadProcessor());

ConsumerConfigurationData<T> configurationData =
ConfigurationDataUtils.loadData(config, conf, ConsumerConfigurationData.class);

configurationData.setMessageListener(messageListener);
configurationData.setConsumerEventListener(consumerEventListener);
configurationData.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff);
configurationData.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff);
configurationData.setCryptoKeyReader(cryptoKeyReader);
configurationData.setMessageCrypto(messageCrypto);
configurationData.setBatchReceivePolicy(batchReceivePolicy);
configurationData.setKeySharedPolicy(keySharedPolicy);
configurationData.setPayloadProcessor(payloadProcessor);

this.conf = configurationData;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,25 @@
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.fail;

/**
Expand Down Expand Up @@ -370,4 +378,95 @@ public void testTopicConsumerBuilder() {
assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
}

@Test
public void testLoadConf() {
ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl(null, Schema.BYTES);
consumerBuilder
.messageListener((consumer, message) -> {})
.consumerEventListener(mock(ConsumerEventListener.class))
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build())
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build())
.cryptoKeyReader(DefaultCryptoKeyReader.builder().build())
.messageCrypto(new MessageCryptoBc("ctx1", true))
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build())
//.keySharedPolicy(mock(KeySharedPolicy.class))
.messagePayloadProcessor(mock(MessagePayloadProcessor.class));

Map<String, Object> conf = new HashMap<>();
MessageListener messageListener = (consumer, message) -> {};
conf.put("messageListener", messageListener);
ConsumerEventListener consumerEventListener = mock(ConsumerEventListener.class);
conf.put("consumerEventListener", consumerEventListener);
RedeliveryBackoff negativeAckRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();
conf.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
RedeliveryBackoff ackTimeoutRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();;
conf.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
CryptoKeyReader cryptoKeyReader = DefaultCryptoKeyReader.builder().build();
conf.put("cryptoKeyReader", cryptoKeyReader);
MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true);
conf.put("messageCrypto", messageCrypto);
BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(2).build();
conf.put("batchReceivePolicy", batchReceivePolicy);
//KeySharedPolicy keySharedPolicy = mock(KeySharedPolicy.class);
//conf.put("keySharedPolicy", keySharedPolicy);
MessagePayloadProcessor payloadProcessor = mock(MessagePayloadProcessor.class);
conf.put("payloadProcessor", payloadProcessor);

consumerBuilder.loadConf(conf);

ConsumerConfigurationData configurationData = consumerBuilder.getConf();
assertSame(configurationData.getMessageListener(), messageListener);
assertSame(configurationData.getConsumerEventListener(), consumerEventListener);
assertSame(configurationData.getNegativeAckRedeliveryBackoff(), negativeAckRedeliveryBackoff);
assertSame(configurationData.getAckTimeoutRedeliveryBackoff(), ackTimeoutRedeliveryBackoff);
assertSame(configurationData.getMessageListener(), messageListener);
assertSame(configurationData.getMessageCrypto(), messageCrypto);
assertSame(configurationData.getCryptoKeyReader(), cryptoKeyReader);
assertSame(configurationData.getBatchReceivePolicy(), batchReceivePolicy);
//assertSame(configurationData.getKeySharedPolicy(), keySharedPolicy);
assertSame(configurationData.getPayloadProcessor(), payloadProcessor);

}

@Test
public void testLoadConfNotModified() {
MessageListener messageListener = (consumer, message) -> {};
ConsumerEventListener consumerEventListener = mock(ConsumerEventListener.class);
RedeliveryBackoff negativeAckRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();
RedeliveryBackoff ackTimeoutRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();;
CryptoKeyReader cryptoKeyReader = DefaultCryptoKeyReader.builder().build();
MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true);
BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(2).build();
//KeySharedPolicy keySharedPolicy = mock(KeySharedPolicy.class);
MessagePayloadProcessor payloadProcessor = mock(MessagePayloadProcessor.class);

ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl(null, Schema.BYTES);
consumerBuilder
.messageListener(messageListener)
.consumerEventListener(consumerEventListener)
.negativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff)
.ackTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff)
.cryptoKeyReader(cryptoKeyReader)
.messageCrypto(messageCrypto)
.batchReceivePolicy(batchReceivePolicy)
//.keySharedPolicy(keySharedPolicy)
.messagePayloadProcessor(payloadProcessor);


consumerBuilder.loadConf(new HashMap<>());

ConsumerConfigurationData configurationData = consumerBuilder.getConf();
assertSame(configurationData.getMessageListener(), messageListener);
assertSame(configurationData.getConsumerEventListener(), consumerEventListener);
assertSame(configurationData.getNegativeAckRedeliveryBackoff(), negativeAckRedeliveryBackoff);
assertSame(configurationData.getAckTimeoutRedeliveryBackoff(), ackTimeoutRedeliveryBackoff);
assertSame(configurationData.getMessageListener(), messageListener);
assertSame(configurationData.getMessageCrypto(), messageCrypto);
assertSame(configurationData.getCryptoKeyReader(), cryptoKeyReader);
assertSame(configurationData.getBatchReceivePolicy(), batchReceivePolicy);
//assertSame(configurationData.getKeySharedPolicy(), keySharedPolicy);
assertSame(configurationData.getPayloadProcessor(), payloadProcessor);

}
}

0 comments on commit 4b6b973

Please sign in to comment.