diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index c0c296371146d..dab42558d01fa 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -98,6 +98,7 @@ public static MetadataStore createMetadataStore(Configuration conf) throws Metad int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout")); store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .sessionTimeoutMillis(zkTimeout) .build()); } catch (MetadataStoreException e) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index a087d8090d3a4..9cfeddd123d80 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -88,9 +88,17 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore); } - public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs) + public static MetadataStoreExtended createLocalMetadataStore(String serverUrls, int sessionTimeoutMs) throws MetadataStoreException { return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder() - .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build()); + .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); + } + + public static MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs) + throws MetadataStoreException { + return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder() + .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false) + .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 9d7aac38a5c63..bebe7ab42f111 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -247,8 +247,8 @@ private static void initializeCluster(Arguments arguments) throws Exception { arguments.metadataStoreUrl, arguments.configurationMetadataStore); MetadataStoreExtended localStore = - initMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis); - MetadataStoreExtended configStore = initMetadataStore(arguments.configurationMetadataStore, + initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis); + MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore, arguments.zkSessionTimeoutMillis); final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl @@ -388,9 +388,22 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam } } - public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception { + public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() .sessionTimeoutMillis(sessionTimeout) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) + .build()); + if (store instanceof MetadataStoreLifecycle) { + ((MetadataStoreLifecycle) store).initializeCluster().get(); + } + return store; + } + + public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout) + throws Exception { + MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder() + .sessionTimeoutMillis(sessionTimeout) + .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .build()); if (store instanceof MetadataStoreLifecycle) { ((MetadataStoreLifecycle) store).initializeCluster().get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index d0da6a5a2bd5d..d0ba5c47f7502 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -99,7 +99,10 @@ public static void main(String[] args) throws Exception { @Cleanup MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper, - MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build()); + MetadataStoreConfig.builder() + .sessionTimeoutMillis(arguments.zkSessionTimeoutMillis) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) + .build()); if (arguments.bkMetadataServiceUri != null) { @Cleanup @@ -121,7 +124,8 @@ public static void main(String[] args) throws Exception { // Should it be done by REST API before broker is down? @Cleanup MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore, - MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build()); + MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis) + .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java index 15e4792fd97dc..bb27be60b3512 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java @@ -83,7 +83,7 @@ public static int doMain(String[] args) throws Exception { } try (MetadataStore configStore = PulsarClusterMetadataSetup - .initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); for (String namespace : arguments.namespaces) { NamespaceName namespaceName = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java index 66607dd4c0a41..b1fae8753ec10 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java @@ -91,7 +91,7 @@ public static void main(String[] args) throws Exception { } try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup - .initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { + .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) { PulsarResources pulsarResources = new PulsarResources(null, configStore); // Create system tenant PulsarClusterMetadataSetup diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6d0e2a42f6697..c41cc4685622a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -362,6 +362,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis()) .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations()) .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb()) + .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) .synchronizer(synchronizer) .build()); } @@ -1043,6 +1044,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations()) .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb()) .synchronizer(synchronizer) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .build()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index a54135de9a8f4..4cb1fd347df13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -143,6 +143,7 @@ public static void main(String[] args) throws Exception { MetadataStoreExtended store = MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(), MetadataStoreConfig.builder() .sessionTimeoutMillis((int) brokerConfig.getMetadataStoreSessionTimeoutMillis()) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .build()); @Cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 2ca1e1f986d92..0105aa483d312 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -380,21 +380,27 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { } protected MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) { - return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder().synchronizer(synchronizer).build()); + return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) + .synchronizer(synchronizer).build()); } protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { - return new ZKMetadataStore(mockZooKeeper); + return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); } protected MetadataStoreExtended createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) { return new ZKMetadataStore(mockZooKeeperGlobal, - MetadataStoreConfig.builder().synchronizer(synchronizer).build()); + MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) + .synchronizer(synchronizer).build()); } protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { - return new ZKMetadataStore(mockZooKeeperGlobal); + return new ZKMetadataStore(mockZooKeeperGlobal, MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build()); } private void mockConfigBrokerInterceptors(PulsarService pulsarService) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java new file mode 100644 index 0000000000000..eba134a2c8dfe --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.google.common.collect.Multimap; +import java.io.ByteArrayOutputStream; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +@Test(groups = "broker") +public class MetadataStoreStatsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setTopicLevelPoliciesEnabled(false); + conf.setSystemTopicEnabled(false); + super.baseSetup(); + AuthenticationProviderToken.resetMetrics(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + resetConfig(); + } + + @Test + public void testMetadataStoreStats() throws Exception { + String ns = "prop/ns-abc1"; + admin.namespaces().createNamespace(ns); + + String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID(); + String subName = "my-sub1"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic).create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subName).subscribe(); + + for (int i = 0; i < 100; i++) { + producer.newMessage().value(UUID.randomUUID().toString()).send(); + } + + for (;;) { + Message message = consumer.receive(10, TimeUnit.SECONDS); + if (message == null) { + break; + } + consumer.acknowledge(message); + } + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + String metricsStr = output.toString(); + Multimap metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr); + + Collection opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum"); + Collection putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total"); + + Assert.assertTrue(opsLatency.size() > 1); + Assert.assertTrue(putBytes.size() > 1); + + for (PrometheusMetricsTest.Metric m : opsLatency) { + Assert.assertEquals(m.tags.get("cluster"), "test"); + String metadataStoreName = m.tags.get("name"); + Assert.assertNotNull(metadataStoreName); + Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE) + || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) + || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE)); + Assert.assertNotNull(m.tags.get("status")); + + if (m.tags.get("status").equals("success")) { + if (m.tags.get("type").equals("get")) { + Assert.assertTrue(m.value >= 0); + } else if (m.tags.get("type").equals("del")) { + Assert.assertTrue(m.value >= 0); + } else if (m.tags.get("type").equals("put")) { + Assert.assertTrue(m.value >= 0); + } else { + Assert.fail(); + } + } else { + if (m.tags.get("type").equals("get")) { + Assert.assertTrue(m.value >= 0); + } else if (m.tags.get("type").equals("del")) { + Assert.assertTrue(m.value >= 0); + } else if (m.tags.get("type").equals("put")) { + Assert.assertTrue(m.value >= 0); + } else { + Assert.fail(); + } + } + } + for (PrometheusMetricsTest.Metric m : putBytes) { + Assert.assertEquals(m.tags.get("cluster"), "test"); + String metadataStoreName = m.tags.get("name"); + Assert.assertNotNull(metadataStoreName); + Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE) + || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE) + || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE)); + Assert.assertTrue(m.value > 0); + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java index 4f7c2f275454f..28bc0cbbb3a4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -251,7 +251,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception { PulsarClusterMetadataSetup.main(args); try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup - .initMetadataStore(zkConnection, 30000)) { + .initLocalMetadataStore(zkConnection, 30000)) { // expected not exist assertFalse(localStore.exists("/ledgers").get()); @@ -268,7 +268,7 @@ public void testSetupWithBkMetadataServiceUri() throws Exception { PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs); try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup - .initMetadataStore(zkConnection, 30000)) { + .initLocalMetadataStore(zkConnection, 30000)) { // expected not exist assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index a34ec879ba609..35a7aa44391ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -83,7 +83,7 @@ public void setup() throws Exception { } service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 7e0ee1bd4669e..982f4cd48ebe0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -64,7 +64,7 @@ protected void setup() throws Exception { config.setWebServicePort(Optional.of(0)); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); service.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java index 26cf8a0e1549f..e6eba5e32cb66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java @@ -67,7 +67,7 @@ public void configTest(int numIoThreads, int connectionsPerBroker) throws Except config.setServiceUrl("http://localhost:8080"); config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100"); WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); service.start(); PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java index bbd2b3bd14fdc..87741e5bede1d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java @@ -79,7 +79,7 @@ public void setup() throws Exception { config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName()); WebSocketService service = spy(new WebSocketService(config)); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 918640642ecbd..951a3db4f0291 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -101,7 +101,7 @@ public void setup() throws Exception { config.setClusterName("test"); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index a8b67416107c7..5e40a7535a7bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -74,7 +74,7 @@ public void setup() throws Exception { config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 1fb12645e5e34..c4f4e876c69db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -62,7 +62,7 @@ public void setup() throws Exception { config.setServiceUrl(pulsar.getSafeWebServiceAddress()); config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createConfigMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index b80c3fb07be5e..09a7d3a90fdbd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -85,7 +85,7 @@ public void setup() throws Exception { } service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java index 819bfd94cb7a5..0fcc4c56b400a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java @@ -47,7 +47,8 @@ public void init(Map config, Function.FunctionDetails functionDe shouldCloseStore = false; } else { String metadataUrl = (String) config.get(METADATA_URL); - store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build()); + store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.STATE_METADATA_STORE).build()); shouldCloseStore = true; } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 0ff6dbc0431ec..d4d4873737d69 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -74,7 +74,7 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept log.info("starting configuration cache service"); try { - configMetadataStore = PulsarResources.createMetadataStore( + configMetadataStore = PulsarResources.createConfigMetadataStore( workerConfig.getConfigurationMetadataStoreUrl(), (int) workerConfig.getMetadataStoreSessionTimeoutMillis()); } catch (IOException e) { diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index 935fc878a628c..b6d33593abf59 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -99,6 +99,11 @@ caffeine + + io.prometheus + simpleclient + + diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index e1d0ede7d2a7b..f00cc4314807a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -29,6 +29,9 @@ @Getter @ToString public class MetadataStoreConfig { + public static final String METADATA_STORE = "metadata-store"; + public static final String STATE_METADATA_STORE = "state-metadata-store"; + public static final String CONFIGURATION_METADATA_STORE = "configuration-metadata-store"; /** * The (implementation specific) session timeout, in milliseconds. @@ -72,6 +75,12 @@ public class MetadataStoreConfig { @Builder.Default private final int batchingMaxSizeKb = 128; + /** + * The name of a metadata store. + */ + @Builder.Default + private final String metadataStoreName = ""; + /** * Pluggable MetadataEventSynchronizer to sync metadata events across the * separate clusters. diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java index 76a14300d0b30..8a9984d626dd7 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java @@ -115,6 +115,7 @@ void createMetadataStore() throws MetadataException { this.store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder() .sessionTimeoutMillis(conf.getZkTimeout()) + .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .build()); this.storeInstanceIsOwned = true; } catch (MetadataStoreException e) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java index 36b0be112e2d1..24fa473085400 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java @@ -119,7 +119,8 @@ private BKCluster(BKClusterConf bkClusterConf) throws Exception { this.baseClientConf = newBaseClientConfiguration(); this.store = - MetadataStoreExtended.create(clusterConf.metadataServiceUri, MetadataStoreConfig.builder().build()); + MetadataStoreExtended.create(clusterConf.metadataServiceUri, MetadataStoreConfig.builder() + .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); baseConf.setJournalRemovePagesFromCache(false); baseConf.setProperty(AbstractMetadataDriver.METADATA_STORE_INSTANCE, store); baseClientConf.setProperty(AbstractMetadataDriver.METADATA_STORE_INSTANCE, store); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 8986811ad9f71..d07fe9ef9e731 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -35,9 +35,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -58,18 +58,20 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; +import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats; @Slf4j public abstract class AbstractMetadataStore implements MetadataStoreExtended, Consumer { - private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> sessionListeners = new CopyOnWriteArrayList<>(); + protected final String metadataStoreName; protected final ScheduledExecutorService executor; private final AsyncLoadingCache> childrenCache; private final AsyncLoadingCache existsCache; private final CopyOnWriteArrayList> metadataCaches = new CopyOnWriteArrayList<>(); + private final MetadataStoreStats metadataStoreStats; // We don't strictly need to use 'volatile' here because we don't need the precise consistent semantic. Instead, // we want to avoid the overhead of 'volatile'. @@ -80,9 +82,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected abstract CompletableFuture existsFromStore(String path); - protected AbstractMetadataStore() { - this.executor = Executors - .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store")); + protected AbstractMetadataStore(String metadataStoreName) { + this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(metadataStoreName)); registerListener(this); this.childrenCache = Caffeine.newBuilder() @@ -126,6 +127,9 @@ public CompletableFuture asyncReload(String key, Boolean oldValue, } } }); + + this.metadataStoreName = metadataStoreName; + this.metadataStoreStats = new MetadataStoreStats(metadataStoreName); } protected void registerSyncLister(Optional synchronizer) { @@ -235,10 +239,20 @@ public MetadataCache getMetadataCache(MetadataSerde serde) { @Override public CompletableFuture> get(String path) { + long start = System.currentTimeMillis(); if (!isValidPath(path)) { - return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); + metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start); + return FutureUtil + .failedFuture(new MetadataStoreException.InvalidPathException(path)); } - return storeGet(path); + return storeGet(path) + .whenComplete((v, t) -> { + if (t != null) { + metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start); + } else { + metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start); + } + }); } protected abstract CompletableFuture> storeGet(String path); @@ -313,7 +327,9 @@ public void accept(Notification n) { @Override public final CompletableFuture delete(String path, Optional expectedVersion) { + long start = System.currentTimeMillis(); if (!isValidPath(path)) { + metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start); return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); } if (getMetadataEventSynchronizer().isPresent()) { @@ -321,9 +337,23 @@ public final CompletableFuture delete(String path, Optional expected expectedVersion.orElse(null), Instant.now().toEpochMilli(), getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Deleted); return getMetadataEventSynchronizer().get().notify(event) - .thenCompose(__ -> deleteInternal(path, expectedVersion)); + .thenCompose(__ -> deleteInternal(path, expectedVersion)) + .whenComplete((v, t) -> { + if (null != t) { + metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start); + } else { + metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start); + } + }); } else { - return deleteInternal(path, expectedVersion); + return deleteInternal(path, expectedVersion) + .whenComplete((v, t) -> { + if (null != t) { + metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start); + } else { + metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start); + } + }); } } @@ -363,7 +393,9 @@ protected abstract CompletableFuture storePut(String path, byte[] data, Op @Override public final CompletableFuture put(String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + long start = System.currentTimeMillis(); if (!isValidPath(path)) { + metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start); return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); } HashSet ops = new HashSet<>(options); @@ -374,9 +406,25 @@ public final CompletableFuture put(String path, byte[] data, Optional putInternal(path, data, optExpectedVersion, options)); + .thenCompose(__ -> putInternal(path, data, optExpectedVersion, options)) + .whenComplete((v, t) -> { + if (t != null) { + metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start); + } else { + int len = data == null ? 0 : data.length; + metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len); + } + }); } else { - return putInternal(path, data, optExpectedVersion, options); + return putInternal(path, data, optExpectedVersion, options) + .whenComplete((v, t) -> { + if (t != null) { + metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start); + } else { + int len = data == null ? 0 : data.length; + metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len); + } + }); } } @@ -422,6 +470,7 @@ protected void receivedSessionEvent(SessionEvent event) { public void close() throws Exception { executor.shutdownNow(); executor.awaitTermination(10, TimeUnit.SECONDS); + this.metadataStoreStats.close(); } @VisibleForTesting diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 924a6ac5d6d4b..b520745a40659 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -75,6 +75,7 @@ private static class Value { public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { + super(metadataStoreConfig.getMetadataStoreName()); String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 69af0e7691400..b276d3cb7a860 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -216,6 +216,7 @@ static long toLong(byte[] bytes) { */ private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { + super(metadataStoreConfig.getMetadataStoreName()); this.metadataUrl = metadataURL; try { RocksDB.loadLibrary(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index c9d245b8caf46..de7088d74afbc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -53,7 +53,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private MetadataEventSynchronizer synchronizer; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { - super(); + super(conf.getMetadataStoreName()); this.enabled = conf.isBatchingEnabled(); this.maxDelayMillis = conf.getBatchingMaxDelayMillis(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java new file mode 100644 index 0000000000000..2351ba1a59105 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.stats; + +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import java.util.concurrent.atomic.AtomicBoolean; + +public final class MetadataStoreStats implements AutoCloseable { + private static final double[] BUCKETS = new double[]{1, 3, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}; + private static final String OPS_TYPE_LABEL_NAME = "type"; + private static final String METADATA_STORE_LABEL_NAME = "name"; + private static final String STATUS = "status"; + + private static final String OPS_TYPE_GET = "get"; + private static final String OPS_TYPE_DEL = "del"; + private static final String OPS_TYPE_PUT = "put"; + private static final String STATUS_SUCCESS = "success"; + private static final String STATUS_FAIL = "fail"; + + protected static final String PREFIX = "pulsar_metadata_store_"; + + private static final Histogram OPS_LATENCY = Histogram + .build(PREFIX + "ops_latency", "-") + .unit("ms") + .buckets(BUCKETS) + .labelNames(METADATA_STORE_LABEL_NAME, OPS_TYPE_LABEL_NAME, STATUS) + .register(); + private static final Counter PUT_BYTES = Counter + .build(PREFIX + "put", "-") + .unit("bytes") + .labelNames(METADATA_STORE_LABEL_NAME) + .register(); + + private final Histogram.Child getOpsSucceedChild; + private final Histogram.Child delOpsSucceedChild; + private final Histogram.Child putOpsSucceedChild; + private final Histogram.Child getOpsFailedChild; + private final Histogram.Child delOpsFailedChild; + private final Histogram.Child putOpsFailedChild; + private final Counter.Child putBytesChild; + private final String metadataStoreName; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public MetadataStoreStats(String metadataStoreName) { + this.metadataStoreName = metadataStoreName; + + this.getOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS); + this.delOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_SUCCESS); + this.putOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_SUCCESS); + this.getOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_FAIL); + this.delOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL); + this.putOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL); + this.putBytesChild = PUT_BYTES.labels(metadataStoreName); + } + + public void recordGetOpsSucceeded(long millis) { + this.getOpsSucceedChild.observe(millis); + } + + public void recordDelOpsSucceeded(long millis) { + this.delOpsSucceedChild.observe(millis); + } + + public void recordPutOpsSucceeded(long millis, int bytes) { + this.putOpsSucceedChild.observe(millis); + this.putBytesChild.inc(bytes); + } + + public void recordGetOpsFailed(long millis) { + this.getOpsFailedChild.observe(millis); + } + + public void recordDelOpsFailed(long millis) { + this.delOpsFailedChild.observe(millis); + } + + public void recordPutOpsFailed(long millis) { + this.putOpsFailedChild.observe(millis); + } + + @Override + public void close() throws Exception { + if (this.closed.compareAndSet(false, true)) { + OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS); + OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_DEL, STATUS_SUCCESS); + OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_PUT, STATUS_SUCCESS); + OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_GET, STATUS_FAIL); + OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL); + OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL); + PUT_BYTES.remove(this.metadataStoreName); + } + } +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java new file mode 100644 index 0000000000000..15ca0d1c58263 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.stats; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 2fb3fd6744622..07580fd7a92fe 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -440,12 +440,12 @@ public Optional getListenPortTls() { } public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { - return PulsarResources.createMetadataStore(proxyConfig.getMetadataStoreUrl(), + return PulsarResources.createLocalMetadataStore(proxyConfig.getMetadataStoreUrl(), proxyConfig.getMetadataStoreSessionTimeoutMillis()); } public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { - return PulsarResources.createMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(), + return PulsarResources.createConfigMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(), proxyConfig.getMetadataStoreSessionTimeoutMillis()); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index f6a4771f9e41e..852ed7f18e867 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -74,7 +74,7 @@ public class PulsarConnectorCache { private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(), - MetadataStoreConfig.builder().build()); + MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig); this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(), StatsProvider.class, getClass().getClassLoader()); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 73987c8b4a862..ab30e7dd4ec0c 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -193,7 +193,7 @@ public static void main(String[] args) throws Exception { @Cleanup MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.metadataStoreUrl, - MetadataStoreConfig.builder().build()); + MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build()); ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkConf, mlFactoryConf); ManagedLedgerConfig mlConf = new ManagedLedgerConfig(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index f8b63e0641f13..7e798eecfddf7 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -106,7 +106,7 @@ public void start() throws PulsarServerException, PulsarClientException, Malform if (isNotBlank(config.getConfigurationMetadataStoreUrl())) { try { - configMetadataStore = createMetadataStore(config.getConfigurationMetadataStoreUrl(), + configMetadataStore = createConfigMetadataStore(config.getConfigurationMetadataStoreUrl(), (int) config.getMetadataStoreSessionTimeoutMillis()); } catch (MetadataStoreException e) { throw new PulsarServerException(e); @@ -140,9 +140,9 @@ public void start() throws PulsarServerException, PulsarClientException, Malform log.info("Pulsar WebSocket Service started"); } - public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs) + public MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs) throws MetadataStoreException { - return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs); + return PulsarResources.createConfigMetadataStore(serverUrls, sessionTimeoutMs); } @Override