Skip to content

Commit

Permalink
[improve][client] Make replicateSubscriptionState nullable
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Dec 19, 2024
1 parent 1c9bf82 commit b388bc2
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1241,8 +1241,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
&& subscribe.isReplicateSubscriptionState();
final Boolean isReplicated =
subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SubscriptionOption {
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private Boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(),
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
}
Expand All @@ -279,7 +279,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
String consumerName, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
Boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
SchemaType schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,29 +133,33 @@ public class PersistentSubscription extends AbstractSubscription {
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;
private volatile Boolean replicatedControlled;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}

static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
Boolean replicated) {
this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.setReplicated(replicated);
if (replicated != null) {
this.setReplicated(replicated);
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
Expand Down Expand Up @@ -194,6 +198,7 @@ public boolean isReplicated() {
}

public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand Down Expand Up @@ -1557,4 +1562,8 @@ public PositionInPendingAckStats checkPositionInPendingAckState(Position positio

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);

@VisibleForTesting
public Boolean getReplicatedControlled() {
return replicatedControlled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ private void createPersistentSubscriptions() {
} else {
final String subscriptionName = Codec.decode(cursor.getName());
subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
cursor.getCursorProperties()));
// subscription-cursor gets activated by default: deactivate as there is no active subscription
// right now
Expand Down Expand Up @@ -584,7 +584,7 @@ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
&& topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
Expand Down Expand Up @@ -888,7 +888,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(),
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch(), option.getSchemaType());
}
Expand All @@ -900,7 +900,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicatedSubscriptionStateArg,
Boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch,
Expand All @@ -911,12 +911,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
}

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;

if (replicatedSubscriptionState
if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}

if (subType == SubType.Key_Shared
Expand Down Expand Up @@ -985,7 +982,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

CompletableFuture<? extends Subscription> subscriptionFuture = isDurable
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
replicatedSubscriptionStateArg, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);

Expand Down Expand Up @@ -1082,7 +1079,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicated,
Boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
Expand Down Expand Up @@ -1113,7 +1110,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
return;
}
}
if (replicated && !subscription.isReplicated()) {
if (replicated != null && replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ReplicateSubscriptionTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
}

@DataProvider
public Object[] replicateSubscriptionState() {
return new Object[]{
Boolean.TRUE,
Boolean.FALSE,
null
};
}

@Test(dataProvider = "replicateSubscriptionState")
public void testReplicateSubscriptionState(Boolean replicateSubscriptionState)
throws Exception {
String topic = "persistent://my-property/my-ns/" + System.nanoTime();
String subName = "sub-" + System.nanoTime();
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName);
if (replicateSubscriptionState != null) {
consumerBuilder.replicateSubscriptionState(replicateSubscriptionState);
}
ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder;
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState);
@Cleanup
Consumer<String> ignored = consumerBuilder.subscribe();
CompletableFuture<Optional<Topic>> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic);
assertThat(topicIfExists)
.succeedsWithin(3, TimeUnit.SECONDS)
.matches(optionalTopic -> {
assertTrue(optionalTopic.isPresent());
Topic topicRef = optionalTopic.get();
Subscription subscription = topicRef.getSubscription(subName);
assertNotNull(subscription);
assertTrue(subscription instanceof PersistentSubscription);
PersistentSubscription persistentSubscription = (PersistentSubscription) subscription;
assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState);
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
synchronized (this) {
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
conf.getReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public int getMaxPendingChuckedMessage() {
value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated"
+ " clusters."
)
private boolean replicateSubscriptionState = false;
private Boolean replicateSubscriptionState;

private boolean resetIncludeHead = false;

Expand Down Expand Up @@ -437,4 +437,12 @@ public ConsumerConfigurationData<T> clone() {
throw new RuntimeException("Failed to clone ConsumerConfigurationData");
}
}

/**
* @deprecated Using {@link #getReplicateSubscriptionState()} instead.
*/
@Deprecated
public boolean isReplicateSubscriptionState() {
return replicateSubscriptionState != null && replicateSubscriptionState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu

public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
Map<String, String> metadata, boolean readCompacted, Boolean isReplicated,
InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo,
boolean createTopicIfDoesNotExist) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
Expand All @@ -594,7 +594,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu

public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
Map<String, String> metadata, boolean readCompacted, Boolean isReplicated,
InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec,
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy,
Map<String, String> subscriptionProperties, long consumerEpoch) {
Expand All @@ -610,9 +610,11 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
.setDurable(isDurable)
.setReadCompacted(readCompacted)
.setInitialPosition(subscriptionInitialPosition)
.setReplicateSubscriptionState(isReplicated)
.setForceTopicCreation(createTopicIfDoesNotExist)
.setConsumerEpoch(consumerEpoch);
if (isReplicated != null) {
subscribe.setReplicateSubscriptionState(isReplicated);
}

if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) {
List<KeyValue> keyValues = new ArrayList<>();
Expand Down

0 comments on commit b388bc2

Please sign in to comment.