Skip to content

Commit

Permalink
[fix][broker] Fix updatePartitionedTopic when replication at ns level…
Browse files Browse the repository at this point in the history
… and topic policy is set (#22971)

(cherry picked from commit 1c44fbb)
  • Loading branch information
lhotari committed Jun 25, 2024
1 parent f10708f commit d93e896
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -532,8 +531,8 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
Set<String> replicationClusters = policies.get().replication_clusters;
TopicPolicies topicPolicies =
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
if (topicPolicies != null) {
replicationClusters = new HashSet<>(topicPolicies.getReplicationClusters());
if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) {
replicationClusters = topicPolicies.getReplicationClustersSet();
}
// Do check replicated clusters.
if (replicationClusters.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.Assert;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.junit.Assert;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -461,6 +461,29 @@ public void testPartitionedTopicLevelReplication() throws Exception {
admin2.topics().deletePartitionedTopic(topicName);
}

// https://github.com/apache/pulsar/issues/22967
@Test
public void testPartitionedTopicWithTopicPolicyAndNoReplicationClusters() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createPartitionedTopic(topicName, 2);
try {
admin1.topicPolicies().setMessageTTL(topicName, 5);
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 2);
});
admin1.topics().updatePartitionedTopic(topicName, 3, false);
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 3);
});
} finally {
// cleanup.
admin1.topics().deletePartitionedTopic(topicName, true);
if (!usingGlobalZK) {
admin2.topics().deletePartitionedTopic(topicName, true);
}
}
}

@Test
public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
config.setEnableReplicatedSubscriptions(true);
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
}

@Override
Expand All @@ -276,11 +277,11 @@ protected void cleanup() throws Exception {
if (!usingGlobalZK) {
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster2));
}
admin1.namespaces().deleteNamespace(replicatedNamespace);
admin1.namespaces().deleteNamespace(nonReplicatedNamespace);
admin1.namespaces().deleteNamespace(replicatedNamespace, true);
admin1.namespaces().deleteNamespace(nonReplicatedNamespace, true);
if (!usingGlobalZK) {
admin2.namespaces().deleteNamespace(replicatedNamespace);
admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
admin2.namespaces().deleteNamespace(replicatedNamespace, true);
admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
}

// shutdown.
Expand Down

0 comments on commit d93e896

Please sign in to comment.