Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Add an option to return 0 when querying partitions of a nonexistent topic #18594

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ allowAutoSubscriptionCreation=true
# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

# Whether to check if a topic exists when querying partitions of the topic.
# It's enabled by default, when a topic is not created, querying the partitions of this topic will throw an exception.
# Otherwise, the queried result is 0. Disable this option to be compatible with some old clients.
checkTopicExistsWhenQueryPartitions=true

# Enable the deletion of inactive topics. This parameter need to cooperate with the allowAutoTopicCreation parameter.
# If brokerDeleteInactiveTopicsEnabled is set to true, we should ensure that allowAutoTopicCreation is also set to true.
brokerDeleteInactiveTopicsEnabled=true
Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,11 @@ allowAutoSubscriptionCreation=true
# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
defaultNumPartitions=1

# Whether to check if a topic exists when querying partitions of the topic.
# It's enabled by default, when a topic is not created, querying the partitions of this topic will throw an exception.
# Otherwise, the queried result is 0. Disable this option to be compatible with some old clients.
checkTopicExistsWhenQueryPartitions=true

### --- Transaction config variables --- ###
# Enable transaction coordinator in broker
transactionCoordinatorEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,15 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private Set<String> messagingProtocols = new TreeSet<>();

@FieldContext(
category = CATEGORY_PROTOCOLS,
doc = "Whether to check if a topic exists when querying partitions of the topic. "
+ "It's enabled by default, when a topic is not created, querying the partitions of this topic will "
+ "throw an exception. Otherwise, the queried result is 0. "
+ "Disable this option to be compatible with some old clients."
)
private boolean checkTopicExistsWhenQueryPartitions = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable system topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadat
return pulsar().getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName);
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName, true);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,9 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
} else {
ret = CompletableFuture.completedFuture(null);
}
return ret.thenApply(__ -> metadata);
return ret.thenApply(__ -> (metadata.partitions < 0)
? new PartitionedTopicMetadata(0, metadata.properties)
: metadata);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2893,6 +2893,11 @@ private void createPendingLoadTopic() {

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
TopicName topicName) {
return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, false);
}

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
TopicName topicName, boolean checkTopicExists) {
if (pulsar.getNamespaceService() == null) {
return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
}
Expand Down Expand Up @@ -2946,7 +2951,13 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
return null;
});
} else {
future.complete(metadata);
if (checkTopicExists
&& metadata.partitions == 0
&& !topicExists) {
future.complete(new PartitionedTopicMetadata(-1, metadata.properties));
} else {
future.complete(metadata);
}
}
});

Expand Down Expand Up @@ -2980,6 +2991,16 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
}

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
return fetchPartitionedTopicMetadataAsync(topicName, false);
}

public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(
TopicName topicName, boolean mightCheckAllowAutoCreation) {
if (mightCheckAllowAutoCreation && !pulsar.getConfig().isCheckTopicExistsWhenQueryPartitions()) {
// Some old clients might not add the "checkAllowAutoCreation=true" query param. If this option is enabled,
// use the same behavior with that query param.
return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName, true);
}
// gets the number of partitions from the configuration cache
return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.concurrent.ExecutionException;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Sets;

@Test(groups = "broker-impl")
public class HttpPartitionMetadataLookupTest extends MockedPulsarServiceBaseTest {

private final EventLoopGroup eventExecutors = new NioEventLoopGroup();

@DataProvider
public Object[][] legacyLookup() {
return new Object[][] { {true}, {false} };
}

@BeforeMethod
@Override
protected void setup() throws Exception {
// No ops
}

private void internalSetup(boolean legacy) throws Exception {
if (legacy) {
conf.setCheckTopicExistsWhenQueryPartitions(false);
}
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("prop",
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
admin.namespaces().createNamespace("prop/ns-abc");
admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc", Sets.newHashSet("test"));
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 45000, dataProvider = "legacyLookup")
public void testLegacyLookup(boolean legacy) throws Exception {
internalSetup(legacy);
BinaryProtoLookupService binaryLookup = (BinaryProtoLookupService)
((PulsarClientImpl) pulsar.getClient()).getLookup();
@Cleanup HttpLookupService lookup = new HttpLookupService(newConf(pulsar), eventExecutors);
@Cleanup LegacyHttpLookupService legacyLookup = new LegacyHttpLookupService(pulsar, eventExecutors);
String topic = "persistent://prop/ns-abc/nonexistent-topic";
try {
assertEquals(legacyLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0);
} catch (ExecutionException e) {
assertFalse(legacy);
assertTrue(e.getCause() instanceof PulsarClientException.NotFoundException);
}
try {
assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 0);
} catch (PulsarAdminException e) {
assertFalse(legacy);
assertTrue(e instanceof PulsarAdminException.NotFoundException);
}
assertEquals(lookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0);
assertEquals(binaryLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0);

topic = "persistent://prop/ns-abc/non-partitioned-topic";
admin.topics().createNonPartitionedTopic(topic);
assertEquals(legacyLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0);
assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 0);
assertEquals(lookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0);
assertEquals(binaryLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 0);

topic = "persistent://prop/ns-abc/partitioned-topic";
admin.topics().createPartitionedTopic(topic, 1);
assertEquals(legacyLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 1);
assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 1);
assertEquals(lookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 1);
assertEquals(binaryLookup.getPartitionedTopicMetadata(TopicName.get(topic)).get().partitions, 1);
}

private static ClientConfigurationData newConf(PulsarService pulsar) {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(pulsar.getWebServiceAddress());
return conf;
}

private static class LegacyHttpLookupService extends HttpLookupService {

public LegacyHttpLookupService(PulsarService pulsar, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
super(newConf(pulsar), eventLoopGroup);
}

@Override
public boolean checkAllowTopicCreation() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -111,10 +112,16 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(T
});
}

@VisibleForTesting
protected boolean checkAllowTopicCreation() {
return true;
}

@Override
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
return httpClient.get(String.format(format, topicName.getLookupName())
+ (checkAllowTopicCreation() ? "?checkAllowAutoCreation=true" : ""),
PartitionedTopicMetadata.class);
}

Expand Down