diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index aed023ee5f52d..9e0ff2cca77cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1835,7 +1835,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) { TopicName topicName = TopicName.get(topic); if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) { log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic); - pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit); + pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null); } } } @@ -1844,15 +1844,56 @@ public AuthorizationService getAuthorizationService() { return authorizationService; } - public CompletableFuture removeTopicFromCache(String topic) { + public CompletableFuture removeTopicFromCache(String topicName) { + return removeTopicFutureFromCache(topicName, null); + } + + public CompletableFuture removeTopicFromCache(Topic topic) { + Optional>> createTopicFuture = findTopicFutureInCache(topic); + if (createTopicFuture.isEmpty()){ + return CompletableFuture.completedFuture(null); + } + return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get()); + } + + private Optional>> findTopicFutureInCache(Topic topic){ + if (topic == null){ + return Optional.empty(); + } + final CompletableFuture> createTopicFuture = topics.get(topic.getName()); + // If not exists in cache, do nothing. + if (createTopicFuture == null){ + return Optional.empty(); + } + // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic. + if (!createTopicFuture.isDone()){ + return Optional.empty(); + } + // If the future in cache has exception complete, + // the topic instance in the cache is not the same with the topic. + if (createTopicFuture.isCompletedExceptionally()){ + return Optional.empty(); + } + Optional optionalTopic = createTopicFuture.join(); + Topic topicInCache = optionalTopic.orElse(null); + if (topicInCache == null || topicInCache != topic){ + return Optional.empty(); + } else { + return Optional.of(createTopicFuture); + } + } + + private CompletableFuture removeTopicFutureFromCache(String topic, + CompletableFuture> createTopicFuture) { TopicName topicName = TopicName.get(topic); return pulsar.getNamespaceService().getBundleAsync(topicName) .thenAccept(namespaceBundle -> { - removeTopicFromCache(topic, namespaceBundle); + removeTopicFromCache(topic, namespaceBundle, createTopicFuture); }); } - public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) { + private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, + CompletableFuture> createTopicFuture) { String bundleName = namespaceBundle.toString(); String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); @@ -1879,7 +1920,12 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) } } } - topics.remove(topic); + + if (createTopicFuture == null) { + topics.remove(topic); + } else { + topics.remove(topic, createTopicFuture); + } Compactor compactor = pulsar.getNullableCompactor(); if (compactor != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 0c2823a131a8f..35b900c297cba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -408,7 +408,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c // topic GC iterates over topics map and removing from the map with the same thread creates // deadlock. so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(NonPersistentTopic.this); log.info("[{}] Topic deleted", topic); deleteFuture.complete(null); }); @@ -474,7 +474,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect // unload topic iterates over topics map and removing from the map with the same thread creates deadlock. // so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(NonPersistentTopic.this); closeFuture.complete(null); }); }).exceptionally(exception -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 42d876d475525..eec19dfa1b838 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1183,7 +1183,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(PersistentTopic.this); dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); @@ -1283,7 +1283,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect @Override public void closeComplete(Object ctx) { // Everything is now closed, remove the topic from map - brokerService.removeTopicFromCache(topic) + brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> { replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); @@ -1305,7 +1305,7 @@ public void closeComplete(Object ctx) { @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - brokerService.removeTopicFromCache(topic); + brokerService.removeTopicFromCache(PersistentTopic.this); unregisterTopicPolicyListener(); closeFuture.complete(null); }