diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 4e9e875bcf4c3..0449e5c885cd3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; +import org.apache.commons.lang3.concurrent.ConcurrentInitializer; +import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -70,7 +72,19 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final PulsarService pulsarService; private final HashSet localCluster; private final String clusterName; - private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; + + private final ConcurrentInitializer + namespaceEventsSystemTopicFactoryLazyInitializer = new LazyInitializer<>() { + @Override + protected NamespaceEventsSystemTopicFactory initialize() { + try { + return new NamespaceEventsSystemTopicFactory(pulsarService.getClient()); + } catch (PulsarServerException e) { + log.error("Create namespace event system topic factory error.", e); + throw new RuntimeException(e); + } + } + }; @VisibleForTesting final Map policiesCache = new ConcurrentHashMap<>(); @@ -102,7 +116,7 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { }); }) .buildAsync((namespaceName, executor) -> { - SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory + SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(namespaceName); return systemTopicClient.newWriterAsync(); }); @@ -301,7 +315,7 @@ public CompletableFuture getTopicPoliciesBypassCacheAsync(TopicNa result.complete(null); return result; } - SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory + SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); systemTopicClient.newReaderAsync().thenAccept(r -> fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result)); @@ -373,7 +387,7 @@ protected CompletableFuture> createSystemT } catch (PulsarServerException ex) { return FutureUtil.failedFuture(ex); } - final SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory + final SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(namespace); return systemTopicClient.newReaderAsync(); } @@ -561,7 +575,7 @@ private void refreshTopicPoliciesCache(Message msg) { log.error("Failed to create system topic factory"); break; } - SystemTopicClient systemTopicClient = namespaceEventsSystemTopicFactory + SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); systemTopicClient.newWriterAsync().thenAccept(writer -> writer.deleteAsync(getEventKey(topicName), @@ -595,18 +609,19 @@ private boolean hasReplicateTo(Message message) { } private void createSystemTopicFactoryIfNeeded() throws PulsarServerException { - if (namespaceEventsSystemTopicFactory == null) { - synchronized (this) { - if (namespaceEventsSystemTopicFactory == null) { - try { - namespaceEventsSystemTopicFactory = - new NamespaceEventsSystemTopicFactory(pulsarService.getClient()); - } catch (PulsarServerException e) { - log.error("Create namespace event system topic factory error.", e); - throw e; - } - } - } + try { + getNamespaceEventsSystemTopicFactory(); + } catch (Exception e) { + throw new PulsarServerException(e); + } + } + + private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() { + try { + return namespaceEventsSystemTopicFactoryLazyInitializer.get(); + } catch (Exception e) { + log.error("Create namespace event system topic factory error.", e); + throw new RuntimeException(e); } }