Skip to content

Commit

Permalink
[fix][client] Fix exception when calling loadConf on a ConsumerBuilde…
Browse files Browse the repository at this point in the history
…r that has a KeySharedPolicy (apache#18345)

(cherry picked from commit 9c2ec5e)
(cherry picked from commit 5e8a213)
  • Loading branch information
cbornet authored and nicoloboschi committed Jan 11, 2023
1 parent 32450a7 commit 56a1690
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public int getMaxPendingChuckedMessage() {

private boolean resetIncludeHead = false;

@JsonIgnore
private transient KeySharedPolicy keySharedPolicy;

private boolean batchIndexAckEnabled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,52 @@
*/
package org.apache.pulsar.client.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Unit tests of {@link ConsumerBuilderImpl}.
*/
Expand Down Expand Up @@ -331,4 +355,212 @@ public void testStartPaused() {
consumerBuilderImpl.startPaused(true);
verify(consumerBuilderImpl.getConf()).setStartPaused(true);
}
@Test
public void testLoadConf() throws Exception {
ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();

String jsonConf = ("{\n"
+ " 'topicNames' : [ 'new-topic' ],\n"
+ " 'topicsPattern' : 'new-topics-pattern',\n"
+ " 'subscriptionName' : 'new-subscription',\n"
+ " 'subscriptionType' : 'Key_Shared',\n"
+ " 'subscriptionProperties' : {\n"
+ " 'new-sub-prop' : 'new-sub-prop-value'\n"
+ " },\n"
+ " 'subscriptionMode' : 'NonDurable',\n"
+ " 'receiverQueueSize' : 2,\n"
+ " 'acknowledgementsGroupTimeMicros' : 2,\n"
+ " 'negativeAckRedeliveryDelayMicros' : 2,\n"
+ " 'maxTotalReceiverQueueSizeAcrossPartitions' : 2,\n"
+ " 'consumerName' : 'new-consumer',\n"
+ " 'ackTimeoutMillis' : 2,\n"
+ " 'tickDurationMillis' : 2,\n"
+ " 'priorityLevel' : 2,\n"
+ " 'maxPendingChunkedMessage' : 2,\n"
+ " 'autoAckOldestChunkedMessageOnQueueFull' : true,\n"
+ " 'expireTimeOfIncompleteChunkedMessageMillis' : 2,\n"
+ " 'cryptoFailureAction' : 'DISCARD',\n"
+ " 'properties' : {\n"
+ " 'new-prop' : 'new-prop-value'\n"
+ " },\n"
+ " 'readCompacted' : true,\n"
+ " 'subscriptionInitialPosition' : 'Earliest',\n"
+ " 'patternAutoDiscoveryPeriod' : 2,\n"
+ " 'regexSubscriptionMode' : 'AllTopics',\n"
+ " 'deadLetterPolicy' : {\n"
+ " 'retryLetterTopic' : 'new-retry',\n"
+ " 'initialSubscriptionName' : 'new-dlq-sub',\n"
+ " 'deadLetterTopic' : 'new-dlq',\n"
+ " 'maxRedeliverCount' : 2\n"
+ " },\n"
+ " 'retryEnable' : true,\n"
+ " 'autoUpdatePartitions' : false,\n"
+ " 'autoUpdatePartitionsIntervalSeconds' : 2,\n"
+ " 'replicateSubscriptionState' : true,\n"
+ " 'resetIncludeHead' : true,\n"
+ " 'batchIndexAckEnabled' : true,\n"
+ " 'ackReceiptEnabled' : true,\n"
+ " 'poolMessages' : true,\n"
+ " 'startPaused' : true\n"
+ " }").replace("'", "\"");

Map<String, Object> conf = new ObjectMapper().readValue(jsonConf, new TypeReference<HashMap<String,Object>>() {});

MessageListener<byte[]> messageListener = (consumer, message) -> {};
conf.put("messageListener", messageListener);
ConsumerEventListener consumerEventListener = mock(ConsumerEventListener.class);
conf.put("consumerEventListener", consumerEventListener);
RedeliveryBackoff negativeAckRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();
conf.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
RedeliveryBackoff ackTimeoutRedeliveryBackoff = MultiplierRedeliveryBackoff.builder().build();;
conf.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
CryptoKeyReader cryptoKeyReader = DefaultCryptoKeyReader.builder().build();
conf.put("cryptoKeyReader", cryptoKeyReader);
MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true);
conf.put("messageCrypto", messageCrypto);
BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(2).build();
conf.put("batchReceivePolicy", batchReceivePolicy);
KeySharedPolicy keySharedPolicy = KeySharedPolicy.stickyHashRange();
conf.put("keySharedPolicy", keySharedPolicy);
MessagePayloadProcessor payloadProcessor = mock(MessagePayloadProcessor.class);
conf.put("payloadProcessor", payloadProcessor);

consumerBuilder.loadConf(conf);

ConsumerConfigurationData<byte[]> configurationData = consumerBuilder.getConf();
assertEquals(configurationData.getTopicNames(), new HashSet<>(Collections.singletonList("new-topic")));
assertEquals(configurationData.getTopicsPattern().pattern(), "new-topics-pattern");
assertEquals(configurationData.getSubscriptionName(), "new-subscription");
assertEquals(configurationData.getSubscriptionType(), SubscriptionType.Key_Shared);
assertThat(configurationData.getSubscriptionProperties()).hasSize(1)
.hasFieldOrPropertyWithValue("new-sub-prop", "new-sub-prop-value");
assertEquals(configurationData.getSubscriptionMode(), SubscriptionMode.NonDurable);
assertEquals(configurationData.getReceiverQueueSize(), 2);
assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), 2);

assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), 2);
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 2);
assertEquals(configurationData.getConsumerName(), "new-consumer");
assertEquals(configurationData.getAckTimeoutMillis(), 2);
assertEquals(configurationData.getTickDurationMillis(), 2);
assertEquals(configurationData.getPriorityLevel(), 2);
assertEquals(configurationData.getMaxPendingChunkedMessage(), 2);
assertTrue(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 2);
assertEquals(configurationData.getCryptoFailureAction(), ConsumerCryptoFailureAction.DISCARD);
assertThat(configurationData.getProperties()).hasSize(1)
.hasFieldOrPropertyWithValue("new-prop", "new-prop-value");
assertTrue(configurationData.isReadCompacted());
assertEquals(configurationData.getSubscriptionInitialPosition(), SubscriptionInitialPosition.Earliest);
assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 2);
assertEquals(configurationData.getRegexSubscriptionMode(), RegexSubscriptionMode.AllTopics);
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), "new-dlq");
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "new-retry");
assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "new-dlq-sub");
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2);
assertTrue(configurationData.isRetryEnable());
assertFalse(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
assertTrue(configurationData.isReplicateSubscriptionState());
assertTrue(configurationData.isResetIncludeHead());
assertTrue(configurationData.isBatchIndexAckEnabled());
assertTrue(configurationData.isAckReceiptEnabled());
assertTrue(configurationData.isPoolMessages());
assertTrue(configurationData.isStartPaused());

assertNull(configurationData.getMessageListener());
assertNull(configurationData.getConsumerEventListener());
assertNull(configurationData.getNegativeAckRedeliveryBackoff());
assertNull(configurationData.getAckTimeoutRedeliveryBackoff());
assertNull(configurationData.getMessageListener());
assertNull(configurationData.getMessageCrypto());
assertNull(configurationData.getCryptoKeyReader());
assertNull(configurationData.getBatchReceivePolicy());
assertNull(configurationData.getKeySharedPolicy());
assertNull(configurationData.getPayloadProcessor());
}

@Test
public void testLoadConfNotModified() {
ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();

consumerBuilder.loadConf(new HashMap<>());

ConsumerConfigurationData<byte[]> configurationData = consumerBuilder.getConf();
assertEquals(configurationData.getTopicNames(), new HashSet<>(Collections.singletonList("topic")));
assertEquals(configurationData.getTopicsPattern().pattern(), "topics-pattern");
assertEquals(configurationData.getSubscriptionName(), "subscription");
assertEquals(configurationData.getSubscriptionType(), SubscriptionType.Exclusive);
assertThat(configurationData.getSubscriptionProperties()).hasSize(1)
.hasFieldOrPropertyWithValue("sub-prop", "sub-prop-value");
assertEquals(configurationData.getSubscriptionMode(), SubscriptionMode.Durable);
assertEquals(configurationData.getReceiverQueueSize(), 1000);
assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), TimeUnit.MILLISECONDS.toMicros(100));
assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), TimeUnit.MINUTES.toMicros(1));
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
assertEquals(configurationData.getConsumerName(), "consumer");
assertEquals(configurationData.getAckTimeoutMillis(), 30000);
assertEquals(configurationData.getTickDurationMillis(), 1000);
assertEquals(configurationData.getPriorityLevel(), 0);
assertEquals(configurationData.getMaxPendingChunkedMessage(), 10);
assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), TimeUnit.MINUTES.toMillis(1));
assertEquals(configurationData.getCryptoFailureAction(), ConsumerCryptoFailureAction.FAIL);
assertThat(configurationData.getProperties()).hasSize(1)
.hasFieldOrPropertyWithValue("prop", "prop-value");
assertFalse(configurationData.isReadCompacted());
assertEquals(configurationData.getSubscriptionInitialPosition(), SubscriptionInitialPosition.Latest);
assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 60);
assertEquals(configurationData.getRegexSubscriptionMode(), RegexSubscriptionMode.PersistentOnly);
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), "dlq");
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), "retry");
assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(), "dlq-sub");
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 1);
assertFalse(configurationData.isRetryEnable());
assertTrue(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
assertFalse(configurationData.isReplicateSubscriptionState());
assertFalse(configurationData.isResetIncludeHead());
assertFalse(configurationData.isBatchIndexAckEnabled());
assertFalse(configurationData.isAckReceiptEnabled());
assertFalse(configurationData.isPoolMessages());
assertFalse(configurationData.isStartPaused());

assertNull(configurationData.getMessageListener());
assertNull(configurationData.getConsumerEventListener());
assertNull(configurationData.getNegativeAckRedeliveryBackoff());
assertNull(configurationData.getAckTimeoutRedeliveryBackoff());
assertNull(configurationData.getMessageListener());
assertNull(configurationData.getMessageCrypto());
assertNull(configurationData.getCryptoKeyReader());
assertNull(configurationData.getBatchReceivePolicy());
assertNull(configurationData.getKeySharedPolicy());
assertNull(configurationData.getPayloadProcessor());
}

private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
ConsumerBuilderImpl<byte[]> consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES);
Map<String, String> properties = new HashMap<>();
properties.put("prop", "prop-value");
Map<String, String> subscriptionProperties = new HashMap<>();
subscriptionProperties.put("sub-prop", "sub-prop-value");
consumerBuilder
.topic("topic")
.topicsPattern("topics-pattern")
.subscriptionName("subscription")
.subscriptionProperties(subscriptionProperties)
.messageListener((consumer, message) -> {})
.consumerEventListener(mock(ConsumerEventListener.class))
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build())
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build())
.consumerName("consumer")
.cryptoKeyReader(DefaultCryptoKeyReader.builder().build())
.messageCrypto(new MessageCryptoBc("ctx1", true))
.properties(properties)
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("dlq").retryLetterTopic("retry").initialSubscriptionName("dlq-sub").maxRedeliverCount(1).build())
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build())
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.messagePayloadProcessor(mock(MessagePayloadProcessor.class));
return consumerBuilder;
}
}

0 comments on commit 56a1690

Please sign in to comment.