Skip to content

Commit

Permalink
[fix][broker] Fix namespace bundle stuck in unloading status (#21445)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Nov 13, 2023
1 parent 9ccb3ac commit 177d67f
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 169 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.MutablePair;
Expand All @@ -43,10 +43,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
Expand Down Expand Up @@ -314,7 +312,7 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
createSystemTopicClient(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
final CompletableFuture<Void> initFuture = readerCompletableFuture
Expand All @@ -340,20 +338,16 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
});
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.completeExceptionally(e);
return result;
} catch (PulsarServerException ex) {
return FutureUtil.failedFuture(ex);
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
final SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result);
return result;
return systemTopicClient.newReaderAsync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
});
final String topicPoliciesServiceInitException
= "Topic creation encountered an exception by initialize topic policies service";

// Creating a producer and creating a Consumer may trigger automatic topic
// creation, let's try to create a Producer and a Consumer
Expand All @@ -146,7 +148,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
assertTrue(expected.getMessage().contains(String.format(msg, topic))
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
Expand All @@ -156,7 +159,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
assertTrue(expected.getMessage().contains(String.format(msg, topic))
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@

import com.google.common.collect.Sets;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -34,6 +38,9 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(true);
conf.setForceDeleteNamespaceAllowed(true);
conf.setTopicLoadTimeoutSeconds(Integer.MAX_VALUE);
super.baseSetup();
}

Expand Down Expand Up @@ -68,4 +75,26 @@ public void testUnloadPartiallyLoadedNamespace() throws Exception {
producer.close();
}

@Test
public void testUnloadWithTopicCreation() throws PulsarAdminException, PulsarClientException {
final String namespaceName = "prop/ns_unloading";
final String topicName = "persistent://prop/ns_unloading/with_topic_creation";
final int partitions = 5;
admin.namespaces().createNamespace(namespaceName, 1);
admin.topics().createPartitionedTopic(topicName, partitions);
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topicName)
.create();

for (int i = 0; i < 100; i++) {
admin.namespaces().unloadNamespaceBundle(namespaceName, "0x00000000_0xffffffff");
}

for (int i = 0; i < partitions; i++) {
producer.send(i);
}
admin.namespaces().deleteNamespace(namespaceName, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {

// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName, true, null);
.loadOrCreatePersistentTopic(topicName, true, null, null);

try {
futureResult.get();
Expand Down Expand Up @@ -1140,7 +1140,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex
for (int i = 0; i < 10; i++) {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null);
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null, null);
loadFutures.add(futureResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
Expand All @@ -42,11 +39,8 @@
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
Expand All @@ -55,7 +49,6 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
Expand Down Expand Up @@ -322,28 +315,6 @@ public void testGetPolicyTimeout() throws Exception {
assertTrue("actual:" + cost, cost >= 5000 - 1000);
}

@Test
public void testCreatSystemTopicClientWithRetry() throws Exception {
SystemTopicBasedTopicPoliciesService service =
spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
Field field = SystemTopicBasedTopicPoliciesService.class
.getDeclaredField("namespaceEventsSystemTopicFactory");
field.setAccessible(true);
NamespaceEventsSystemTopicFactory factory = spy((NamespaceEventsSystemTopicFactory) field.get(service));
SystemTopicClient<PulsarEvent> client = mock(TopicPoliciesSystemTopicClient.class);
doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any());
field.set(service, factory);

SystemTopicClient.Reader<PulsarEvent> reader = mock(SystemTopicClient.Reader.class);
// Throw an exception first, create successfully after retrying
doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();

SystemTopicClient.Reader<PulsarEvent> reader1 = service.createSystemTopicClientWithRetry(null).get();

assertEquals(reader1, reader);
}

@Test
public void testGetTopicPoliciesWithRetry() throws Exception {
Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void testNonDurableSubscribe() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("non-durable");
admin.topics().createNonPartitionedTopic(topicName);

int numberOfMessages = 10;
@Cleanup("shutdownNow")
Expand Down Expand Up @@ -193,13 +194,64 @@ public void testDurableSubscribe() throws Exception {
}
}

@Test(timeOut = 20000)
public void testRead() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("reader");
admin.topics().createNonPartitionedTopic(topicName);

int numberOfMessages = 10;
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(() -> {
try {
PulsarClientTool pulsarClientToolReader = new PulsarClientTool(properties);
String[] args = {"read", "-m", "latest", "-n", Integer.toString(numberOfMessages), "--hex", "-r", "30",
topicName};
assertEquals(pulsarClientToolReader.run(args), 0);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});

// Make sure subscription has been created
retryStrategically((test) -> {
try {
return admin.topics().getSubscriptions(topicName).size() == 1;
} catch (Exception e) {
return false;
}
}, 10, 500);

assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
assertTrue(admin.topics().getSubscriptions(topicName).get(0).startsWith("reader-"));
PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);

String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName};
assertEquals(pulsarClientToolProducer.run(args), 0);
assertFalse(future.isCompletedExceptionally());
future.get();

Awaitility.await()
.ignoreExceptions()
.atMost(Duration.ofMillis(20000))
.until(()->admin.topics().getSubscriptions(topicName).size() == 0);
}

@Test(timeOut = 20000)
public void testEncryption() throws Exception {
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("encryption");
admin.topics().createNonPartitionedTopic(topicName);
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;

Expand Down

0 comments on commit 177d67f

Please sign in to comment.