diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index f85e3d2dc7562..154a0ec0c4fd8 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -57,7 +57,15 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private final String identity; private Optional synchronizer; - OxiaMetadataStore( + public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { + super("oxia-metadata"); + this.client = oxia; + this.identity = identity; + this.synchronizer = Optional.empty(); + init(); + } + + public OxiaMetadataStore( @NonNull String serviceAddress, @NonNull String namespace, @NonNull MetadataStoreConfig metadataStoreConfig, @@ -69,7 +77,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { if (!metadataStoreConfig.isBatchingEnabled()) { linger = 0; } - updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer()); + synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer()); identity = UUID.randomUUID().toString(); client = OxiaClientBuilder.create(serviceAddress) @@ -80,8 +88,14 @@ public class OxiaMetadataStore extends AbstractMetadataStore { .maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations()) .asyncClient() .get(); + init(); + } + + private void init() { + updateMetadataEventSynchronizer(synchronizer.orElse(null)); + client.notifications(this::notificationCallback); - super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer())); + super.registerSyncListener(synchronizer); } private void notificationCallback(Notification notification) {