diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index b803c51bcc1db1..417d0f081b8d88 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -87,7 +87,39 @@ public ConsumerBuilderImpl(PulsarClientImpl client, Schema schema) { @Override public ConsumerBuilder loadConf(Map config) { - this.conf = ConfigurationDataUtils.loadData(config, conf, ConsumerConfigurationData.class); + MessageListener messageListener = + (MessageListener) config.getOrDefault("messageListener", this.conf.getMessageListener()); + ConsumerEventListener consumerEventListener = + (ConsumerEventListener) config.getOrDefault("consumerEventListener", this.conf.getConsumerEventListener()); + RedeliveryBackoff negativeAckRedeliveryBackoff = (RedeliveryBackoff) config + .getOrDefault("negativeAckRedeliveryBackoff", this.conf.getNegativeAckRedeliveryBackoff()); + RedeliveryBackoff ackTimeoutRedeliveryBackoff = (RedeliveryBackoff) config + .getOrDefault("ackTimeoutRedeliveryBackoff", this.conf.getAckTimeoutRedeliveryBackoff()); + CryptoKeyReader cryptoKeyReader = + (CryptoKeyReader) config.getOrDefault("cryptoKeyReader", this.conf.getCryptoKeyReader()); + MessageCrypto messageCrypto = + (MessageCrypto) config.getOrDefault("messageCrypto", this.conf.getMessageCrypto()); + BatchReceivePolicy batchReceivePolicy = + (BatchReceivePolicy) config.getOrDefault("batchReceivePolicy", this.conf.getBatchReceivePolicy()); + KeySharedPolicy keySharedPolicy = + (KeySharedPolicy) config.getOrDefault("keySharedPolicy", this.conf.getKeySharedPolicy()); + MessagePayloadProcessor payloadProcessor = + (MessagePayloadProcessor) config.getOrDefault("payloadProcessor", this.conf.getPayloadProcessor()); + + ConsumerConfigurationData configurationData = + ConfigurationDataUtils.loadData(config, conf, ConsumerConfigurationData.class); + + configurationData.setMessageListener(messageListener); + configurationData.setConsumerEventListener(consumerEventListener); + configurationData.setNegativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff); + configurationData.setAckTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff); + configurationData.setCryptoKeyReader(cryptoKeyReader); + configurationData.setMessageCrypto(messageCrypto); + configurationData.setBatchReceivePolicy(batchReceivePolicy); + configurationData.setKeySharedPolicy(keySharedPolicy); + configurationData.setPayloadProcessor(payloadProcessor); + + this.conf = configurationData; return this; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 95fb1ae968d355..e973f606515039 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -34,17 +34,25 @@ import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.MessageCrypto; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertSame; import static org.testng.Assert.fail; /** @@ -370,4 +378,95 @@ public void testTopicConsumerBuilder() { assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue(); assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); } + + @Test + public void testLoadConf() { + ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl(null, Schema.BYTES); + consumerBuilder + .messageListener((consumer, message) -> {}) + .consumerEventListener(mock(ConsumerEventListener.class)) + .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build()) + .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build()) + .cryptoKeyReader(DefaultCryptoKeyReader.builder().build()) + .messageCrypto(new MessageCryptoBc("ctx1", true)) + .batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build()) + //.keySharedPolicy(mock(KeySharedPolicy.class)) + .messagePayloadProcessor(mock(MessagePayloadProcessor.class)); + + Map conf = new HashMap<>(); + MessageListener messageListener = (consumer, message) -> {}; + conf.put("messageListener", messageListener); + ConsumerEventListener consumerEventListener = mock(ConsumerEventListener.class); + conf.put("consumerEventListener", consumerEventListener); + RedeliveryBackoff negativeAckRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build(); + conf.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff); + RedeliveryBackoff ackTimeoutRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();; + conf.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff); + CryptoKeyReader cryptoKeyReader = DefaultCryptoKeyReader.builder().build(); + conf.put("cryptoKeyReader", cryptoKeyReader); + MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true); + conf.put("messageCrypto", messageCrypto); + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(2).build(); + conf.put("batchReceivePolicy", batchReceivePolicy); + //KeySharedPolicy keySharedPolicy = mock(KeySharedPolicy.class); + //conf.put("keySharedPolicy", keySharedPolicy); + MessagePayloadProcessor payloadProcessor = mock(MessagePayloadProcessor.class); + conf.put("payloadProcessor", payloadProcessor); + + consumerBuilder.loadConf(conf); + + ConsumerConfigurationData configurationData = consumerBuilder.getConf(); + assertSame(configurationData.getMessageListener(), messageListener); + assertSame(configurationData.getConsumerEventListener(), consumerEventListener); + assertSame(configurationData.getNegativeAckRedeliveryBackoff(), negativeAckRedeliveryBackoff); + assertSame(configurationData.getAckTimeoutRedeliveryBackoff(), ackTimeoutRedeliveryBackoff); + assertSame(configurationData.getMessageListener(), messageListener); + assertSame(configurationData.getMessageCrypto(), messageCrypto); + assertSame(configurationData.getCryptoKeyReader(), cryptoKeyReader); + assertSame(configurationData.getBatchReceivePolicy(), batchReceivePolicy); + //assertSame(configurationData.getKeySharedPolicy(), keySharedPolicy); + assertSame(configurationData.getPayloadProcessor(), payloadProcessor); + + } + + @Test + public void testLoadConfNotModified() { + MessageListener messageListener = (consumer, message) -> {}; + ConsumerEventListener consumerEventListener = mock(ConsumerEventListener.class); + RedeliveryBackoff negativeAckRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build(); + RedeliveryBackoff ackTimeoutRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();; + CryptoKeyReader cryptoKeyReader = DefaultCryptoKeyReader.builder().build(); + MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true); + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(2).build(); + //KeySharedPolicy keySharedPolicy = mock(KeySharedPolicy.class); + MessagePayloadProcessor payloadProcessor = mock(MessagePayloadProcessor.class); + + ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl(null, Schema.BYTES); + consumerBuilder + .messageListener(messageListener) + .consumerEventListener(consumerEventListener) + .negativeAckRedeliveryBackoff(negativeAckRedeliveryBackoff) + .ackTimeoutRedeliveryBackoff(ackTimeoutRedeliveryBackoff) + .cryptoKeyReader(cryptoKeyReader) + .messageCrypto(messageCrypto) + .batchReceivePolicy(batchReceivePolicy) + //.keySharedPolicy(keySharedPolicy) + .messagePayloadProcessor(payloadProcessor); + + + consumerBuilder.loadConf(new HashMap<>()); + + ConsumerConfigurationData configurationData = consumerBuilder.getConf(); + assertSame(configurationData.getMessageListener(), messageListener); + assertSame(configurationData.getConsumerEventListener(), consumerEventListener); + assertSame(configurationData.getNegativeAckRedeliveryBackoff(), negativeAckRedeliveryBackoff); + assertSame(configurationData.getAckTimeoutRedeliveryBackoff(), ackTimeoutRedeliveryBackoff); + assertSame(configurationData.getMessageListener(), messageListener); + assertSame(configurationData.getMessageCrypto(), messageCrypto); + assertSame(configurationData.getCryptoKeyReader(), cryptoKeyReader); + assertSame(configurationData.getBatchReceivePolicy(), batchReceivePolicy); + //assertSame(configurationData.getKeySharedPolicy(), keySharedPolicy); + assertSame(configurationData.getPayloadProcessor(), payloadProcessor); + + } }