From 56a16903f74307121735871425ef4df047c8f77e Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Wed, 16 Nov 2022 01:45:24 +0100 Subject: [PATCH] [fix][client] Fix exception when calling loadConf on a ConsumerBuilder that has a KeySharedPolicy (#18345) (cherry picked from commit 9c2ec5e218b4743430d654c84f7e26f663b126f1) (cherry picked from commit 5e8a2138fc0647ee585ef4e6e0dd6f311349eaae) --- .../impl/conf/ConsumerConfigurationData.java | 1 + .../client/impl/ConsumerBuilderImplTest.java | 232 ++++++++++++++++++ 2 files changed, 233 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index cf1f60d8e9358..ab2527b493236 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -149,6 +149,7 @@ public int getMaxPendingChuckedMessage() { private boolean resetIncludeHead = false; + @JsonIgnore private transient KeySharedPolicy keySharedPolicy; private boolean batchIndexAckEnabled = false; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index a04505ddee1d2..ad390c5d132e5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -18,12 +18,19 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -31,15 +38,32 @@ import java.util.regex.Pattern; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.KeySharedPolicy; +import org.apache.pulsar.client.api.MessageCrypto; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.RedeliveryBackoff; +import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + /** * Unit tests of {@link ConsumerBuilderImpl}. */ @@ -331,4 +355,212 @@ public void testStartPaused() { consumerBuilderImpl.startPaused(true); verify(consumerBuilderImpl.getConf()).setStartPaused(true); } + @Test + public void testLoadConf() throws Exception { + ConsumerBuilderImpl consumerBuilder = createConsumerBuilder(); + + String jsonConf = ("{\n" + + " 'topicNames' : [ 'new-topic' ],\n" + + " 'topicsPattern' : 'new-topics-pattern',\n" + + " 'subscriptionName' : 'new-subscription',\n" + + " 'subscriptionType' : 'Key_Shared',\n" + + " 'subscriptionProperties' : {\n" + + " 'new-sub-prop' : 'new-sub-prop-value'\n" + + " },\n" + + " 'subscriptionMode' : 'NonDurable',\n" + + " 'receiverQueueSize' : 2,\n" + + " 'acknowledgementsGroupTimeMicros' : 2,\n" + + " 'negativeAckRedeliveryDelayMicros' : 2,\n" + + " 'maxTotalReceiverQueueSizeAcrossPartitions' : 2,\n" + + " 'consumerName' : 'new-consumer',\n" + + " 'ackTimeoutMillis' : 2,\n" + + " 'tickDurationMillis' : 2,\n" + + " 'priorityLevel' : 2,\n" + + " 'maxPendingChunkedMessage' : 2,\n" + + " 'autoAckOldestChunkedMessageOnQueueFull' : true,\n" + + " 'expireTimeOfIncompleteChunkedMessageMillis' : 2,\n" + + " 'cryptoFailureAction' : 'DISCARD',\n" + + " 'properties' : {\n" + + " 'new-prop' : 'new-prop-value'\n" + + " },\n" + + " 'readCompacted' : true,\n" + + " 'subscriptionInitialPosition' : 'Earliest',\n" + + " 'patternAutoDiscoveryPeriod' : 2,\n" + + " 'regexSubscriptionMode' : 'AllTopics',\n" + + " 'deadLetterPolicy' : {\n" + + " 'retryLetterTopic' : 'new-retry',\n" + + " 'initialSubscriptionName' : 'new-dlq-sub',\n" + + " 'deadLetterTopic' : 'new-dlq',\n" + + " 'maxRedeliverCount' : 2\n" + + " },\n" + + " 'retryEnable' : true,\n" + + " 'autoUpdatePartitions' : false,\n" + + " 'autoUpdatePartitionsIntervalSeconds' : 2,\n" + + " 'replicateSubscriptionState' : true,\n" + + " 'resetIncludeHead' : true,\n" + + " 'batchIndexAckEnabled' : true,\n" + + " 'ackReceiptEnabled' : true,\n" + + " 'poolMessages' : true,\n" + + " 'startPaused' : true\n" + + " }").replace("'", "\""); + + Map conf = new ObjectMapper().readValue(jsonConf, new TypeReference>() {}); + + MessageListener messageListener = (consumer, message) -> {}; + conf.put("messageListener", messageListener); + ConsumerEventListener consumerEventListener = mock(ConsumerEventListener.class); + conf.put("consumerEventListener", consumerEventListener); + RedeliveryBackoff negativeAckRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build(); + conf.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff); + RedeliveryBackoff ackTimeoutRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();; + conf.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff); + CryptoKeyReader cryptoKeyReader = DefaultCryptoKeyReader.builder().build(); + conf.put("cryptoKeyReader", cryptoKeyReader); + MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true); + conf.put("messageCrypto", messageCrypto); + BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(2).build(); + conf.put("batchReceivePolicy", batchReceivePolicy); + KeySharedPolicy keySharedPolicy = KeySharedPolicy.stickyHashRange(); + conf.put("keySharedPolicy", keySharedPolicy); + MessagePayloadProcessor payloadProcessor = mock(MessagePayloadProcessor.class); + conf.put("payloadProcessor", payloadProcessor); + + consumerBuilder.loadConf(conf); + + ConsumerConfigurationData configurationData = consumerBuilder.getConf(); + assertEquals(configurationData.getTopicNames(), new HashSet<>(Collections.singletonList("new-topic"))); + assertEquals(configurationData.getTopicsPattern().pattern(), "new-topics-pattern"); + assertEquals(configurationData.getSubscriptionName(), "new-subscription"); + assertEquals(configurationData.getSubscriptionType(), SubscriptionType.Key_Shared); + assertThat(configurationData.getSubscriptionProperties()).hasSize(1) + .hasFieldOrPropertyWithValue("new-sub-prop", "new-sub-prop-value"); + assertEquals(configurationData.getSubscriptionMode(), SubscriptionMode.NonDurable); + assertEquals(configurationData.getReceiverQueueSize(), 2); + assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), 2); + + assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), 2); + assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 2); + assertEquals(configurationData.getConsumerName(), "new-consumer"); + assertEquals(configurationData.getAckTimeoutMillis(), 2); + assertEquals(configurationData.getTickDurationMillis(), 2); + assertEquals(configurationData.getPriorityLevel(), 2); + assertEquals(configurationData.getMaxPendingChunkedMessage(), 2); + assertTrue(configurationData.isAutoAckOldestChunkedMessageOnQueueFull()); + assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 2); + assertEquals(configurationData.getCryptoFailureAction(), ConsumerCryptoFailureAction.DISCARD); + assertThat(configurationData.getProperties()).hasSize(1) + .hasFieldOrPropertyWithValue("new-prop", "new-prop-value"); + assertTrue(configurationData.isReadCompacted()); + assertEquals(configurationData.getSubscriptionInitialPosition(), SubscriptionInitialPosition.Earliest); + assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 2); + assertEquals(configurationData.getRegexSubscriptionMode(), RegexSubscriptionMode.AllTopics); + assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), "new-dlq"); + assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "new-retry"); + assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "new-dlq-sub"); + assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2); + assertTrue(configurationData.isRetryEnable()); + assertFalse(configurationData.isAutoUpdatePartitions()); + assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2); + assertTrue(configurationData.isReplicateSubscriptionState()); + assertTrue(configurationData.isResetIncludeHead()); + assertTrue(configurationData.isBatchIndexAckEnabled()); + assertTrue(configurationData.isAckReceiptEnabled()); + assertTrue(configurationData.isPoolMessages()); + assertTrue(configurationData.isStartPaused()); + + assertNull(configurationData.getMessageListener()); + assertNull(configurationData.getConsumerEventListener()); + assertNull(configurationData.getNegativeAckRedeliveryBackoff()); + assertNull(configurationData.getAckTimeoutRedeliveryBackoff()); + assertNull(configurationData.getMessageListener()); + assertNull(configurationData.getMessageCrypto()); + assertNull(configurationData.getCryptoKeyReader()); + assertNull(configurationData.getBatchReceivePolicy()); + assertNull(configurationData.getKeySharedPolicy()); + assertNull(configurationData.getPayloadProcessor()); + } + + @Test + public void testLoadConfNotModified() { + ConsumerBuilderImpl consumerBuilder = createConsumerBuilder(); + + consumerBuilder.loadConf(new HashMap<>()); + + ConsumerConfigurationData configurationData = consumerBuilder.getConf(); + assertEquals(configurationData.getTopicNames(), new HashSet<>(Collections.singletonList("topic"))); + assertEquals(configurationData.getTopicsPattern().pattern(), "topics-pattern"); + assertEquals(configurationData.getSubscriptionName(), "subscription"); + assertEquals(configurationData.getSubscriptionType(), SubscriptionType.Exclusive); + assertThat(configurationData.getSubscriptionProperties()).hasSize(1) + .hasFieldOrPropertyWithValue("sub-prop", "sub-prop-value"); + assertEquals(configurationData.getSubscriptionMode(), SubscriptionMode.Durable); + assertEquals(configurationData.getReceiverQueueSize(), 1000); + assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), TimeUnit.MILLISECONDS.toMicros(100)); + assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MINUTES.toMicros(1)); + assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000); + assertEquals(configurationData.getConsumerName(), "consumer"); + assertEquals(configurationData.getAckTimeoutMillis(), 30000); + assertEquals(configurationData.getTickDurationMillis(), 1000); + assertEquals(configurationData.getPriorityLevel(), 0); + assertEquals(configurationData.getMaxPendingChunkedMessage(), 10); + assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull()); + assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), TimeUnit.MINUTES.toMillis(1)); + assertEquals(configurationData.getCryptoFailureAction(), ConsumerCryptoFailureAction.FAIL); + assertThat(configurationData.getProperties()).hasSize(1) + .hasFieldOrPropertyWithValue("prop", "prop-value"); + assertFalse(configurationData.isReadCompacted()); + assertEquals(configurationData.getSubscriptionInitialPosition(), SubscriptionInitialPosition.Latest); + assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 60); + assertEquals(configurationData.getRegexSubscriptionMode(), RegexSubscriptionMode.PersistentOnly); + assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), "dlq"); + assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "retry"); + assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "dlq-sub"); + assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 1); + assertFalse(configurationData.isRetryEnable()); + assertTrue(configurationData.isAutoUpdatePartitions()); + assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60); + assertFalse(configurationData.isReplicateSubscriptionState()); + assertFalse(configurationData.isResetIncludeHead()); + assertFalse(configurationData.isBatchIndexAckEnabled()); + assertFalse(configurationData.isAckReceiptEnabled()); + assertFalse(configurationData.isPoolMessages()); + assertFalse(configurationData.isStartPaused()); + + assertNull(configurationData.getMessageListener()); + assertNull(configurationData.getConsumerEventListener()); + assertNull(configurationData.getNegativeAckRedeliveryBackoff()); + assertNull(configurationData.getAckTimeoutRedeliveryBackoff()); + assertNull(configurationData.getMessageListener()); + assertNull(configurationData.getMessageCrypto()); + assertNull(configurationData.getCryptoKeyReader()); + assertNull(configurationData.getBatchReceivePolicy()); + assertNull(configurationData.getKeySharedPolicy()); + assertNull(configurationData.getPayloadProcessor()); + } + + private ConsumerBuilderImpl createConsumerBuilder() { + ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES); + Map properties = new HashMap<>(); + properties.put("prop", "prop-value"); + Map subscriptionProperties = new HashMap<>(); + subscriptionProperties.put("sub-prop", "sub-prop-value"); + consumerBuilder + .topic("topic") + .topicsPattern("topics-pattern") + .subscriptionName("subscription") + .subscriptionProperties(subscriptionProperties) + .messageListener((consumer, message) -> {}) + .consumerEventListener(mock(ConsumerEventListener.class)) + .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build()) + .ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build()) + .consumerName("consumer") + .cryptoKeyReader(DefaultCryptoKeyReader.builder().build()) + .messageCrypto(new MessageCryptoBc("ctx1", true)) + .properties(properties) + .deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("dlq").retryLetterTopic("retry").initialSubscriptionName("dlq-sub").maxRedeliverCount(1).build()) + .batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build()) + .keySharedPolicy(KeySharedPolicy.autoSplitHashRange()) + .messagePayloadProcessor(mock(MessagePayloadProcessor.class)); + return consumerBuilder; + } }