diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index cc3938491e637..5488d5563f607 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -37,6 +37,9 @@ import javax.annotation.Nonnull; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; import org.apache.commons.lang3.concurrent.LazyInitializer; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; @@ -267,37 +270,33 @@ public CompletableFuture> getTopicPoliciesAsync(TopicNam return CompletableFuture.completedFuture(Optional.empty()); } final CompletableFuture preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject()); - final var resultFuture = new CompletableFuture>(); - preparedFuture.thenAccept(inserted -> policyCacheInitMap.compute(namespace, (___, existingFuture) -> { - if (!inserted || existingFuture != null) { - final var partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - final var policies = Optional.ofNullable(switch (type) { - case DEFAULT -> Optional.ofNullable(policiesCache.get(partitionedTopicName)) - .orElseGet(() -> globalPoliciesCache.get(partitionedTopicName)); - case GLOBAL_ONLY -> globalPoliciesCache.get(partitionedTopicName); - case LOCAL_ONLY -> policiesCache.get(partitionedTopicName); - }); - resultFuture.complete(policies); - } else { - CompletableFuture.runAsync(() -> { - log.info("The future of {} has been removed from cache, retry getTopicPolicies again", namespace); - // Call it in another thread to avoid recursive update because getTopicPoliciesAsync() could call - // policyCacheInitMap.computeIfAbsent() - getTopicPoliciesAsync(topicName, type).whenComplete((result, e) -> { - if (e == null) { - resultFuture.complete(result); - } else { - resultFuture.completeExceptionally(e); - } + // switch thread to avoid potential metadata thread cost and recursive deadlock + return preparedFuture.thenComposeAsync(inserted -> { + // initialized : policies + final Mutable>> policiesFutureHolder = new MutableObject<>(); + // NOTICE: avoid using any callback with lock scope to avoid deadlock + policyCacheInitMap.compute(namespace, (___, existingFuture) -> { + if (!inserted || existingFuture != null) { + final var partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + final var policies = Optional.ofNullable(switch (type) { + case DEFAULT -> Optional.ofNullable(policiesCache.get(partitionedTopicName)) + .orElseGet(() -> globalPoliciesCache.get(partitionedTopicName)); + case GLOBAL_ONLY -> globalPoliciesCache.get(partitionedTopicName); + case LOCAL_ONLY -> policiesCache.get(partitionedTopicName); }); - }); + policiesFutureHolder.setValue(Pair.of(true, policies)); + } else { + policiesFutureHolder.setValue(Pair.of(false, null)); + } + return existingFuture; + }); + final var p = policiesFutureHolder.getValue(); + if (!p.getLeft()) { + log.info("The future of {} has been removed from cache, retry getTopicPolicies again", namespace); + return getTopicPoliciesAsync(topicName, type); } - return existingFuture; - })).exceptionally(e -> { - resultFuture.completeExceptionally(e); - return null; + return CompletableFuture.completedFuture(p.getRight()); }); - return resultFuture; } public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {