Skip to content

Commit

Permalink
[fix][broker] Fix a deadlock in SystemTopicBasedTopicPoliciesService …
Browse files Browse the repository at this point in the history
…during NamespaceEventsSystemTopicFactory init (apache#22528)
  • Loading branch information
heesung-sn authored Apr 18, 2024
1 parent d0b9d47 commit 72474d7
Showing 1 changed file with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -70,7 +72,19 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private final PulsarService pulsarService;
private final HashSet localCluster;
private final String clusterName;
private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;

private final ConcurrentInitializer<NamespaceEventsSystemTopicFactory>
namespaceEventsSystemTopicFactoryLazyInitializer = new LazyInitializer<>() {
@Override
protected NamespaceEventsSystemTopicFactory initialize() {
try {
return new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
} catch (PulsarServerException e) {
log.error("Create namespace event system topic factory error.", e);
throw new RuntimeException(e);
}
}
};

@VisibleForTesting
final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -102,7 +116,7 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
});
})
.buildAsync((namespaceName, executor) -> {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(namespaceName);
return systemTopicClient.newWriterAsync();
});
Expand Down Expand Up @@ -301,7 +315,7 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa
result.complete(null);
return result;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newReaderAsync().thenAccept(r ->
fetchTopicPoliciesAsyncAndCloseReader(r, topicName, null, result));
Expand Down Expand Up @@ -373,7 +387,7 @@ protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemT
} catch (PulsarServerException ex) {
return FutureUtil.failedFuture(ex);
}
final SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
final SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(namespace);
return systemTopicClient.newReaderAsync();
}
Expand Down Expand Up @@ -561,7 +575,7 @@ private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
log.error("Failed to create system topic factory");
break;
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
systemTopicClient.newWriterAsync().thenAccept(writer
-> writer.deleteAsync(getEventKey(topicName),
Expand Down Expand Up @@ -595,18 +609,19 @@ private boolean hasReplicateTo(Message<?> message) {
}

private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
if (namespaceEventsSystemTopicFactory == null) {
synchronized (this) {
if (namespaceEventsSystemTopicFactory == null) {
try {
namespaceEventsSystemTopicFactory =
new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
} catch (PulsarServerException e) {
log.error("Create namespace event system topic factory error.", e);
throw e;
}
}
}
try {
getNamespaceEventsSystemTopicFactory();
} catch (Exception e) {
throw new PulsarServerException(e);
}
}

private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() {
try {
return namespaceEventsSystemTopicFactoryLazyInitializer.get();
} catch (Exception e) {
log.error("Create namespace event system topic factory error.", e);
throw new RuntimeException(e);
}
}

Expand Down

0 comments on commit 72474d7

Please sign in to comment.