Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more settings to Consumer in PulsarProperties #223

Merged
merged 1 commit into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;

Expand Down Expand Up @@ -132,6 +135,16 @@ public static class Consumer {
*/
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

/**
* Map of properties to add to the subscription.
*/
private Map<String, String> subscriptionProperties = new HashMap<>();

/**
* Subscription mode to be used when subscribing to the topic.
*/
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

/**
* Number of messages that can be accumulated before the consumer calls "receive".
*/
Expand Down Expand Up @@ -205,17 +218,60 @@ public static class Consumer {
*/
private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;

/**
* Dead letter policy to use.
*/
@Nullable
private DeadLetterPolicy deadLetterPolicy;

/**
* Whether to auto retry messages.
*/
private Boolean retryEnable = false;

/**
* Whether the consumer auto-subscribes for partition increase. This is only for
* partitioned consumers.
*/
private Boolean autoUpdatePartitions = true;

/**
* Interval of partitions discovery updates.
*/
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);

/**
* Whether to replicate subscription state.
*/
private Boolean replicateSubscriptionState = false;

/**
* Whether to include the given position of any reset operation like
* {@link org.apache.pulsar.client.api.Consumer#seek(long) or
* {@link Consumer#seek(MessageId)}}.
*/
private Boolean resetIncludeHead = false;

/**
* Whether the batch index acknowledgment is enabled.
*/
private Boolean batchIndexAckEnabled = false;

/**
* Whether an acknowledgement receipt is enabled.
*/
private Boolean ackReceiptEnabled = false;

/**
* Whether pooling of messages and the underlying data buffers is enabled.
*/
private Boolean poolMessages = false;

/**
* Whether to start the consumer in a paused state.
*/
private Boolean startPaused = false;

/**
* Whether to automatically drop outstanding un-acked messages if the queue is
* full.
Expand Down Expand Up @@ -257,6 +313,22 @@ public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}

public Map<String, String> getSubscriptionProperties() {
return this.subscriptionProperties;
}

public void setSubscriptionProperties(Map<String, String> subscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;
}

public SubscriptionMode getSubscriptionMode() {
return this.subscriptionMode;
}

public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}

public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
Expand Down Expand Up @@ -345,7 +417,7 @@ public void setProperties(SortedMap<String, String> properties) {
this.properties = properties;
}

public Boolean isReadCompacted() {
public Boolean getReadCompacted() {
return this.readCompacted;
}

Expand Down Expand Up @@ -377,23 +449,88 @@ public void setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode
this.regexSubscriptionMode = regexSubscriptionMode;
}

public Boolean isAutoUpdatePartitions() {
@Nullable
public DeadLetterPolicy getDeadLetterPolicy() {
return this.deadLetterPolicy;
}

public void setDeadLetterPolicy(@Nullable DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}

public Boolean getRetryEnable() {
return this.retryEnable;
}

public void setRetryEnable(Boolean retryEnable) {
this.retryEnable = retryEnable;
}

public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}

public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}

public Boolean isReplicateSubscriptionState() {
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}

public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
}

public Boolean getReplicateSubscriptionState() {
return this.replicateSubscriptionState;
}

public void setReplicateSubscriptionState(Boolean replicateSubscriptionState) {
this.replicateSubscriptionState = replicateSubscriptionState;
}

public Boolean isAutoAckOldestChunkedMessageOnQueueFull() {
public Boolean getResetIncludeHead() {
return this.resetIncludeHead;
}

public void setResetIncludeHead(Boolean resetIncludeHead) {
this.resetIncludeHead = resetIncludeHead;
}

public Boolean getBatchIndexAckEnabled() {
return this.batchIndexAckEnabled;
}

public void setBatchIndexAckEnabled(Boolean batchIndexAckEnabled) {
this.batchIndexAckEnabled = batchIndexAckEnabled;
}

public Boolean getAckReceiptEnabled() {
return this.ackReceiptEnabled;
}

public void setAckReceiptEnabled(Boolean ackReceiptEnabled) {
this.ackReceiptEnabled = ackReceiptEnabled;
}

public Boolean getPoolMessages() {
return this.poolMessages;
}

public void setPoolMessages(Boolean poolMessages) {
this.poolMessages = poolMessages;
}

public Boolean getStartPaused() {
return this.startPaused;
}

public void setStartPaused(Boolean startPaused) {
this.startPaused = startPaused;
}

public Boolean getAutoAckOldestChunkedMessageOnQueueFull() {
return this.autoAckOldestChunkedMessageOnQueueFull;
}

Expand Down Expand Up @@ -426,6 +563,8 @@ public Map<String, Object> buildProperties() {
map.from(this::getTopicsPattern).to(properties.in("topicsPattern"));
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));
map.from(this::getSubscriptionType).to(properties.in("subscriptionType"));
map.from(this::getSubscriptionProperties).to(properties.in("subscriptionProperties"));
map.from(this::getSubscriptionMode).to(properties.in("subscriptionMode"));
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
map.from(this::getAcknowledgementsGroupTime).as(it -> it.toNanos() / 1000)
.to(properties.in("acknowledgementsGroupTimeMicros"));
Expand All @@ -439,13 +578,22 @@ public Map<String, Object> buildProperties() {
map.from(this::getPriorityLevel).to(properties.in("priorityLevel"));
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(this::getProperties).to(properties.in("properties"));
map.from(this::isReadCompacted).to(properties.in("readCompacted"));
map.from(this::getReadCompacted).to(properties.in("readCompacted"));
map.from(this::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition"));
map.from(this::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod"));
map.from(this::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode"));
map.from(this::isAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::isReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(this::isAutoAckOldestChunkedMessageOnQueueFull)
map.from(this::getDeadLetterPolicy).to(properties.in("deadLetterPolicy"));
map.from(this::getRetryEnable).to(properties.in("retryEnable"));
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(this::getReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(this::getResetIncludeHead).to(properties.in("resetIncludeHead"));
map.from(this::getBatchIndexAckEnabled).to(properties.in("batchIndexAckEnabled"));
map.from(this::getAckReceiptEnabled).to(properties.in("ackReceiptEnabled"));
map.from(this::getPoolMessages).to(properties.in("poolMessages"));
map.from(this::getStartPaused).to(properties.in("startPaused"));
map.from(this::getAutoAckOldestChunkedMessageOnQueueFull)
.to(properties.in("autoAckOldestChunkedMessageOnQueueFull"));
map.from(this::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage"));
map.from(this::getExpireTimeOfIncompleteChunkedMessage).as(Duration::toMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatNoException;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -27,14 +28,17 @@

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -145,8 +149,8 @@ void producerProperties() {
Map<String, Object> producerProps = properties.buildProducerProperties();

// Verify that the props can be loaded in a ProducerBuilder
ConfigurationDataUtils.loadData(producerProps, new ProducerConfigurationData(),
ProducerConfigurationData.class);
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps,
new ProducerConfigurationData(), ProducerConfigurationData.class));

assertThat(producerProps).containsEntry("topicName", "my-topic")
.containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000)
Expand Down Expand Up @@ -181,6 +185,8 @@ void consumerProperties() {
props.put("spring.pulsar.consumer.topics-pattern", "my-pattern");
props.put("spring.pulsar.consumer.subscription-name", "my-subscription");
props.put("spring.pulsar.consumer.subscription-type", "Shared");
props.put("spring.pulsar.consumer.subscription-properties[my-sub-prop]", "my-sub-prop-value");
props.put("spring.pulsar.consumer.subscription-mode", "NonDurable");
props.put("spring.pulsar.consumer.receiver-queue-size", "1");
props.put("spring.pulsar.consumer.acknowledgements-group-time", "2s");
props.put("spring.pulsar.consumer.negative-ack-redelivery-delay", "3s");
Expand All @@ -195,22 +201,40 @@ void consumerProperties() {
props.put("spring.pulsar.consumer.subscription-initial-position", "Earliest");
props.put("spring.pulsar.consumer.pattern-auto-discovery-period", "9");
props.put("spring.pulsar.consumer.regex-subscription-mode", "AllTopics");
props.put("spring.pulsar.consumer.dead-letter-policy.max-redeliver-count", "4");
props.put("spring.pulsar.consumer.dead-letter-policy.retry-letter-topic", "my-retry-topic");
props.put("spring.pulsar.consumer.dead-letter-policy.dead-letter-topic", "my-dlt-topic");
props.put("spring.pulsar.consumer.dead-letter-policy.initial-subscription-name", "my-initial-subscription");
props.put("spring.pulsar.consumer.retry-enable", "true");
props.put("spring.pulsar.consumer.auto-update-partitions", "false");
props.put("spring.pulsar.consumer.auto-update-partitions-interval", "10s");
props.put("spring.pulsar.consumer.replicate-subscription-state", "true");
props.put("spring.pulsar.consumer.reset-include-head", "true");
props.put("spring.pulsar.consumer.batch-index-ack-enabled", "true");
props.put("spring.pulsar.consumer.ack-receipt-enabled", "true");
props.put("spring.pulsar.consumer.pool-messages", "true");
props.put("spring.pulsar.consumer.start-paused", "true");
props.put("spring.pulsar.consumer.auto-ack-oldest-chunked-message-on-queue-full", "false");
props.put("spring.pulsar.consumer.max-pending-chunked-message", "11");
props.put("spring.pulsar.consumer.expire-time-of-incomplete-chunked-message", "12s");

bind(props);
Map<String, Object> consumerProps = properties.buildConsumerProperties();

// Verify that the props can be loaded in a ConsumerBuilder
onobc marked this conversation as resolved.
Show resolved Hide resolved
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps,
new ConsumerConfigurationData<>(), ConsumerConfigurationData.class));

assertThat(consumerProps)
.hasEntrySatisfying("topicNames",
n -> assertThat((Collection<String>) n).containsExactly("my-topic"))
.hasEntrySatisfying("topicsPattern", p -> assertThat(p.toString()).isEqualTo("my-pattern"))
.containsEntry("subscriptionName", "my-subscription")
.containsEntry("subscriptionType", SubscriptionType.Shared).containsEntry("receiverQueueSize", 1)
.containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L)
.containsEntry("subscriptionType", SubscriptionType.Shared)
.hasEntrySatisfying("subscriptionProperties",
p -> assertThat((Map<String, String>) p).containsEntry("my-sub-prop", "my-sub-prop-value"))
.containsEntry("subscriptionMode", SubscriptionMode.NonDurable)
.containsEntry("receiverQueueSize", 1).containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L)
.containsEntry("negativeAckRedeliveryDelayMicros", 3_000_000L)
.containsEntry("maxTotalReceiverQueueSizeAcrossPartitions", 5)
.containsEntry("consumerName", "my-consumer").containsEntry("ackTimeoutMillis", 6_000L)
Expand All @@ -222,7 +246,17 @@ void consumerProperties() {
.containsEntry("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest)
.containsEntry("patternAutoDiscoveryPeriod", 9)
.containsEntry("regexSubscriptionMode", RegexSubscriptionMode.AllTopics)
.containsEntry("autoUpdatePartitions", false).containsEntry("replicateSubscriptionState", true)
.hasEntrySatisfying("deadLetterPolicy", dlp -> {
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) dlp;
assertThat(deadLetterPolicy.getMaxRedeliverCount()).isEqualTo(4);
assertThat(deadLetterPolicy.getRetryLetterTopic()).isEqualTo("my-retry-topic");
assertThat(deadLetterPolicy.getDeadLetterTopic()).isEqualTo("my-dlt-topic");
assertThat(deadLetterPolicy.getInitialSubscriptionName()).isEqualTo("my-initial-subscription");
}).containsEntry("retryEnable", true).containsEntry("autoUpdatePartitions", false)
.containsEntry("autoUpdatePartitionsIntervalSeconds", 10L)
.containsEntry("replicateSubscriptionState", true).containsEntry("resetIncludeHead", true)
.containsEntry("batchIndexAckEnabled", true).containsEntry("ackReceiptEnabled", true)
.containsEntry("poolMessages", true).containsEntry("startPaused", true)
.containsEntry("autoAckOldestChunkedMessageOnQueueFull", false)
.containsEntry("maxPendingChunkedMessage", 11)
.containsEntry("expireTimeOfIncompleteChunkedMessageMillis", 12_000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static <T> void loadConf(ConsumerBuilder<T> builder, Map<String, Object>
propertiesCopy.remove("cryptoKeyReader");
propertiesCopy.remove("messageCrypto");
propertiesCopy.remove("batchReceivePolicy");
propertiesCopy.remove("keySharedPolicy");
propertiesCopy.remove("payloadProcessor");

builder.loadConf(propertiesCopy);
Expand All @@ -79,6 +80,8 @@ public static <T> void loadConf(ConsumerBuilder<T> builder, Map<String, Object>
"messageCrypto");
applyValueToBuilderAfterLoadConf(builderConf::getBatchReceivePolicy, builder::batchReceivePolicy, properties,
"batchReceivePolicy");
applyValueToBuilderAfterLoadConf(builderConf::getKeySharedPolicy, builder::keySharedPolicy, properties,
"keySharedPolicy");
applyValueToBuilderAfterLoadConf(builderConf::getPayloadProcessor, builder::messagePayloadProcessor, properties,
"payloadProcessor");
}
Expand Down