Skip to content

Commit

Permalink
[improve][client] Make replicateSubscriptionState nullable (#23757)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
(cherry picked from commit 3fce309)
  • Loading branch information
nodece committed Dec 30, 2024
1 parent 84a4bc6 commit 109f042
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
&& subscribe.isReplicateSubscriptionState();
final Boolean isReplicated =
subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class SubscriptionOption {
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private Boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(),
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
}
Expand All @@ -290,7 +290,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
String consumerName, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
Boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
SchemaType schemaType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,33 @@ public class PersistentSubscription extends AbstractSubscription {
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
private volatile CompletableFuture<Void> inProgressResetCursorFuture;
private volatile Boolean replicatedControlled;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}

static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
Boolean replicated) {
this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.setReplicated(replicated);
if (replicated != null) {
this.setReplicated(replicated);
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
Expand Down Expand Up @@ -196,6 +200,7 @@ public boolean isReplicated() {
}

public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand Down Expand Up @@ -1515,4 +1520,8 @@ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl pos

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);

@VisibleForTesting
public Boolean getReplicatedControlled() {
return replicatedControlled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ private void createPersistentSubscriptions() {
} else {
final String subscriptionName = Codec.decode(cursor.getName());
subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
cursor.getCursorProperties()));
// subscription-cursor gets activated by default: deactivate as there is no active subscription
// right now
Expand Down Expand Up @@ -606,7 +606,7 @@ public CompletableFuture<Void> unloadSubscription(@Nonnull String subName) {
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
requireNonNull(topicCompactionService);
if (isCompactionSubscription(subscriptionName)
&& topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) {
Expand Down Expand Up @@ -931,7 +931,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(),
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch(), option.getSchemaType());
}
Expand All @@ -943,7 +943,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicatedSubscriptionStateArg,
Boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch,
Expand All @@ -954,12 +954,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
}

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;

if (replicatedSubscriptionState
if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}

if (subType == SubType.Key_Shared
Expand Down Expand Up @@ -1028,7 +1025,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

CompletableFuture<? extends Subscription> subscriptionFuture = isDurable
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
replicatedSubscriptionStateArg, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);

Expand Down Expand Up @@ -1125,7 +1122,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicated,
Boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
Expand Down Expand Up @@ -1156,7 +1153,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
return;
}
}
if (replicated && !subscription.isReplicated()) {
if (replicated != null && replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ReplicateSubscriptionTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
}

@DataProvider
public Object[] replicateSubscriptionState() {
return new Object[]{
Boolean.TRUE,
Boolean.FALSE,
null
};
}

@Test(dataProvider = "replicateSubscriptionState")
public void testReplicateSubscriptionState(Boolean replicateSubscriptionState)
throws Exception {
String topic = "persistent://my-property/my-ns/" + System.nanoTime();
String subName = "sub-" + System.nanoTime();
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName);
if (replicateSubscriptionState != null) {
consumerBuilder.replicateSubscriptionState(replicateSubscriptionState);
}
ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder;
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState);
@Cleanup
Consumer<String> ignored = consumerBuilder.subscribe();
CompletableFuture<Optional<Topic>> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic);
assertThat(topicIfExists)
.succeedsWithin(3, TimeUnit.SECONDS)
.matches(optionalTopic -> {
assertTrue(optionalTopic.isPresent());
Topic topicRef = optionalTopic.get();
Subscription subscription = topicRef.getSubscription(subName);
assertNotNull(subscription);
assertTrue(subscription instanceof PersistentSubscription);
PersistentSubscription persistentSubscription = (PersistentSubscription) subscription;
assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState);
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
synchronized (this) {
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
conf.getReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
Expand Down Expand Up @@ -378,7 +379,8 @@ public int getMaxPendingChuckedMessage() {
value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated"
+ " clusters."
)
private boolean replicateSubscriptionState = false;
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Boolean replicateSubscriptionState;

private boolean resetIncludeHead = false;

Expand Down Expand Up @@ -434,4 +436,14 @@ public ConsumerConfigurationData<T> clone() {
throw new RuntimeException("Failed to clone ConsumerConfigurationData");
}
}

/**
* Backward compatibility with the old `replicateSubscriptionState` field.
* @deprecated Using {@link #getReplicateSubscriptionState()} instead.
*/
@JsonIgnore
@Deprecated
public boolean isReplicateSubscriptionState() {
return replicateSubscriptionState != null && replicateSubscriptionState;
}
}
Loading

0 comments on commit 109f042

Please sign in to comment.