diff --git a/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java b/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java index 45fb61fd79..c24d0eb76d 100644 --- a/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java +++ b/common/src/main/java/org/astraea/common/cost/utils/ClusterInfoSensor.java @@ -40,7 +40,7 @@ public class ClusterInfoSensor implements MetricSensor { @Override public List fetch(MBeanClient client, ClusterBean bean) { return Stream.of( - List.of(ServerMetrics.KafkaServer.clusterId(client)), + List.of(ServerMetrics.KafkaServer.CLUSTER_ID.fetch(client)), LogMetrics.Log.SIZE.fetch(client), ClusterMetrics.Partition.REPLICAS_COUNT.fetch(client)) .flatMap(Collection::stream) diff --git a/common/src/main/java/org/astraea/common/metrics/broker/ClusterMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/ClusterMetrics.java index 9e0b5bad99..1292b24901 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/ClusterMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/ClusterMetrics.java @@ -16,7 +16,11 @@ */ package org.astraea.common.metrics.broker; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.admin.TopicPartition; @@ -28,9 +32,25 @@ public final class ClusterMetrics { public static final String DOMAIN_NAME = "kafka.cluster"; + public static final Collection QUERIES = List.copyOf(Partition.ALL.values()); + public enum Partition implements EnumInfo { REPLICAS_COUNT("ReplicasCount"); + private static final Map ALL = + Arrays.stream(Partition.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "Partition") + .property("topic", "*") + .property("partition", "*") + .property("name", m.metricName()) + .build())); + private final String metricName; Partition(String metricName) { @@ -55,18 +75,8 @@ public static ClusterMetrics.Partition ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(ClusterMetrics.Partition.class, alias); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "Partition") - .property("topic", "*") - .property("partition", "*") - .property("name", metricName()) - .build(); - } - public List fetch(MBeanClient client) { - return client.beans(query()).stream() + return client.beans(ALL.get(this)).stream() .map(PartitionMetric::new) .collect(Collectors.toUnmodifiableList()); } diff --git a/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java index 57d2dd38e3..3d9ed768af 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java @@ -16,10 +16,13 @@ */ package org.astraea.common.metrics.broker; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.astraea.common.EnumInfo; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; @@ -30,10 +33,28 @@ public final class LogMetrics { public static final String DOMAIN_NAME = "kafka.log"; public static final String LOG_TYPE = "Log"; + public static final Collection QUERIES = + Stream.of(LogCleanerManager.ALL.values().stream(), Log.ALL.values().stream()) + .flatMap(f -> f) + .collect(Collectors.toUnmodifiableList()); + public enum LogCleanerManager implements EnumInfo { UNCLEANABLE_BYTES("uncleanable-bytes"), UNCLEANABLE_PARTITIONS_COUNT("uncleanable-partitions-count"); + private static final Map ALL = + Arrays.stream(LogCleanerManager.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "LogCleanerManager") + .property("logDirectory", "*") + .property("name", m.metricName) + .build())); + public static LogCleanerManager ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(LogCleanerManager.class, alias); } @@ -58,17 +79,8 @@ public String toString() { return alias(); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "LogCleanerManager") - .property("logDirectory", "*") - .property("name", metricName) - .build(); - } - public List fetch(MBeanClient mBeanClient) { - return mBeanClient.beans(query()).stream() + return mBeanClient.beans(ALL.get(this)).stream() .map(Gauge::new) .collect(Collectors.toUnmodifiableList()); } @@ -108,6 +120,28 @@ public enum Log implements EnumInfo { NUM_LOG_SEGMENTS("NumLogSegments"), SIZE("Size"); + private static final Map ALL = + Arrays.stream(Log.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", LOG_TYPE) + .property("topic", "*") + .property("partition", "*") + .property("name", m.metricName) + // Due to a Kafka bug. This log metrics might come with an `is-future` + // property + // with it. + // And the property is never removed even if the folder migration is + // done. + // We use the BeanQuery property list pattern to work around this issue. + // See https://github.com/apache/kafka/pull/12979 + .usePropertyListPattern() + .build())); + public static Log ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(Log.class, alias); } @@ -140,24 +174,8 @@ public static Collection gauges(Collection beans, Log type .collect(Collectors.toUnmodifiableList()); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", LOG_TYPE) - .property("topic", "*") - .property("partition", "*") - .property("name", metricName) - // Due to a Kafka bug. This log metrics might come with an `is-future` property - // with it. - // And the property is never removed even if the folder migration is done. - // We use the BeanQuery property list pattern to work around this issue. - // See https://github.com/apache/kafka/pull/12979 - .usePropertyListPattern() - .build(); - } - public List fetch(MBeanClient mBeanClient) { - return mBeanClient.beans(query()).stream() + return mBeanClient.beans(ALL.get(this)).stream() .map(Gauge::new) .collect(Collectors.toUnmodifiableList()); } diff --git a/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java index 19028d4e29..70437bc975 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java @@ -16,12 +16,19 @@ */ package org.astraea.common.metrics.broker; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.MBeanClient; public class NetworkMetrics { + public static final Collection QUERIES = List.copyOf(Request.ALL.values()); public enum Request implements EnumInfo { PRODUCE("Produce"), @@ -84,6 +91,19 @@ public enum Request implements EnumInfo { LIST_TRANSACTIONS("ListTransactions"), ALLOCATE_PRODUCER_IDS("AllocateProducerIds"); + private static final Map ALL = + Arrays.stream(Request.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName("kafka.network") + .property("type", "RequestMetrics") + .property("request", m.metricName()) + .property("name", "TotalTimeMs") + .build())); + public static Request ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(Request.class, alias); } @@ -108,17 +128,8 @@ public String toString() { return alias(); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName("kafka.network") - .property("type", "RequestMetrics") - .property("request", this.metricName()) - .property("name", "TotalTimeMs") - .build(); - } - public Histogram fetch(MBeanClient mBeanClient) { - return new Histogram(mBeanClient.bean(query())); + return new Histogram(mBeanClient.bean(ALL.get(this))); } public static class Histogram implements HasHistogram { diff --git a/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java index 00e856d79b..7823d01937 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java @@ -16,14 +16,18 @@ */ package org.astraea.common.metrics.broker; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.astraea.common.EnumInfo; +import org.astraea.common.Utils; import org.astraea.common.metrics.AppInfo; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; @@ -33,39 +37,52 @@ public final class ServerMetrics { public static final String DOMAIN_NAME = "kafka.server"; + public static final BeanQuery APP_INFO_QUERY = + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "app-info") + .property("id", "*") + .build(); + + public static final Collection QUERIES = + Stream.of( + Stream.of(APP_INFO_QUERY), + KafkaServer.ALL.values().stream(), + DelayedOperationPurgatory.ALL.values().stream(), + Topic.ALL.values().stream(), + BrokerTopic.ALL.values().stream(), + ReplicaManager.ALL.values().stream(), + Socket.QUERIES.stream()) + .flatMap(f -> f) + .collect(Collectors.toUnmodifiableList()); + public static List appInfo(MBeanClient client) { - return client - .beans( - BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "app-info") - .property("id", "*") - .build()) - .stream() + return client.beans(APP_INFO_QUERY).stream() .map(b -> (AppInfo) () -> b) .collect(Collectors.toList()); } public enum KafkaServer implements EnumInfo { + CLUSTER_ID("ClusterId"), YAMMER_METRICS_COUNT("yammer-metrics-count"), BROKER_STATE("BrokerState"), LINUX_DISK_READ_BYTES("linux-disk-read-bytes"), LINUX_DISK_WRITE_BYTES("linux-disk-write-bytes"); /** Others are Gauge-Number , this is Gauge-String */ - public static final String CLUSTER_ID = "ClusterId"; - private final String metricName; - public static ClusterIdGauge clusterId(MBeanClient mBeanClient) { - return new ClusterIdGauge( - mBeanClient.bean( - BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "KafkaServer") - .property("name", CLUSTER_ID) - .build())); - } + private static final Map ALL = + Arrays.stream(KafkaServer.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "KafkaServer") + .property("name", m.metricName) + .build())); KafkaServer(String name) { this.metricName = name; @@ -75,16 +92,13 @@ public String metricName() { return metricName; } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "KafkaServer") - .property("name", metricName) - .build(); - } - - public Gauge fetch(MBeanClient mBeanClient) { - return new Gauge(mBeanClient.bean(query())); + public HasBeanObject fetch(MBeanClient mBeanClient) { + switch (this) { + case CLUSTER_ID: + return new ClusterIdGauge(mBeanClient.bean(ALL.get(this))); + default: + return new Gauge(mBeanClient.bean(ALL.get(this))); + } } public static KafkaServer ofAlias(String alias) { @@ -145,6 +159,19 @@ public enum DelayedOperationPurgatory implements EnumInfo { PRODUCE("Produce"), REBALANCE("Rebalance"); + private static final Map ALL = + Arrays.stream(DelayedOperationPurgatory.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "DelayedOperationPurgatory") + .property("delayedOperation", m.metricName) + .property("name", "PurgatorySize") + .build())); + public static DelayedOperationPurgatory ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(DelayedOperationPurgatory.class, alias); } @@ -169,17 +196,8 @@ public String toString() { return alias(); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "DelayedOperationPurgatory") - .property("delayedOperation", metricName) - .property("name", "PurgatorySize") - .build(); - } - public Gauge fetch(MBeanClient mBeanClient) { - return new Gauge(mBeanClient.bean(query())); + return new Gauge(mBeanClient.bean(ALL.get(this))); } public static class Gauge implements HasGauge { @@ -211,6 +229,19 @@ public enum Topic implements EnumInfo { TOTAL_FETCH_REQUESTS_PER_SEC("TotalFetchRequestsPerSec"), TOTAL_PRODUCE_REQUESTS_PER_SEC("TotalProduceRequestsPerSec"); + private static final Map ALL = + Arrays.stream(Topic.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "BrokerTopicMetrics") + .property("topic", "*") + .property("name", m.metricName()) + .build())); + public static Topic ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(Topic.class, alias); } @@ -235,17 +266,10 @@ public String toString() { return alias(); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "BrokerTopicMetrics") - .property("topic", "*") - .property("name", this.metricName()) - .build(); - } - public List fetch(MBeanClient mBeanClient) { - return mBeanClient.beans(query()).stream().map(Topic.Meter::new).collect(Collectors.toList()); + return mBeanClient.beans(ALL.get(this)).stream() + .map(Topic.Meter::new) + .collect(Collectors.toList()); } public Builder builder() { @@ -395,6 +419,18 @@ public enum BrokerTopic implements EnumInfo { /** Byte out rate to clients. */ BYTES_OUT_PER_SEC("BytesOutPerSec"); + private static final Map ALL = + Arrays.stream(BrokerTopic.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "BrokerTopicMetrics") + .property("name", m.metricName()) + .build())); + public static BrokerTopic ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(BrokerTopic.class, alias); } @@ -434,16 +470,8 @@ public Collection of(Collection objects) { .collect(Collectors.toUnmodifiableList()); } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "BrokerTopicMetrics") - .property("name", this.metricName()) - .build(); - } - public Meter fetch(MBeanClient mBeanClient) { - return new Meter(mBeanClient.bean(query())); + return new Meter(mBeanClient.bean(ALL.get(this))); } public static class Meter implements HasMeter { @@ -483,6 +511,18 @@ public enum ReplicaManager implements EnumInfo { UNDER_MIN_ISR_PARTITION_COUNT("UnderMinIsrPartitionCount"), UNDER_REPLICATED_PARTITIONS("UnderReplicatedPartitions"); + private static final Map ALL = + Arrays.stream(ReplicaManager.values()) + .collect( + Collectors.toUnmodifiableMap( + Function.identity(), + m -> + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", "ReplicaManager") + .property("name", m.metricName) + .build())); + public static ReplicaManager ofAlias(String alias) { return EnumInfo.ignoreCaseEnum(ReplicaManager.class, alias); } @@ -497,16 +537,8 @@ public String metricName() { return metricName; } - public BeanQuery query() { - return BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", "ReplicaManager") - .property("name", metricName) - .build(); - } - public Gauge fetch(MBeanClient mBeanClient) { - return new Gauge(mBeanClient.bean(query())); + return new Gauge(mBeanClient.bean(ALL.get(this))); } @Override @@ -554,54 +586,56 @@ public static class Socket { private static final String PROP_CLIENT_SOFTWARE_NAME = "clientSoftwareName"; private static final String PROP_CLIENT_SOFTWARE_VERSION = "clientSoftwareVersion"; + public static final BeanQuery SOCKET_LISTENER_QUERY = + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", METRIC_TYPE) + .property(PROP_LISTENER, "*") + .build(); + + public static final BeanQuery SOCKET_NETWORK_PROCESSOR_QUERY = + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", METRIC_TYPE) + .property(PROP_LISTENER, "*") + .property(PROP_NETWORK_PROCESSOR, "*") + .build(); + + public static final BeanQuery CLIENT_QUERY = + BeanQuery.builder() + .domainName(DOMAIN_NAME) + .property("type", METRIC_TYPE) + .property(PROP_LISTENER, "*") + .property(PROP_NETWORK_PROCESSOR, "*") + .property(PROP_CLIENT_SOFTWARE_NAME, "*") + .property(PROP_CLIENT_SOFTWARE_VERSION, "*") + .build(); + + public static final BeanQuery SOCKET_QUERY = + BeanQuery.builder().domainName(DOMAIN_NAME).property("type", METRIC_TYPE).build(); + + public static final Collection QUERIES = + Utils.constants(Socket.class, name -> name.endsWith("QUERY"), BeanQuery.class); + public static SocketMetric socket(MBeanClient mBeanClient) { - return new SocketMetric( - mBeanClient.bean( - BeanQuery.builder().domainName(DOMAIN_NAME).property("type", METRIC_TYPE).build())); + return new SocketMetric(mBeanClient.bean(SOCKET_QUERY)); } public static List socketListener(MBeanClient mBeanClient) { - return mBeanClient - .beans( - BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", METRIC_TYPE) - .property(PROP_LISTENER, "*") - .build()) - .stream() + return mBeanClient.beans(SOCKET_LISTENER_QUERY).stream() .map(SocketListenerMetric::new) .collect(Collectors.toList()); } public static List socketNetworkProcessor( MBeanClient mBeanClient) { - return mBeanClient - .beans( - BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", METRIC_TYPE) - .property(PROP_LISTENER, "*") - .property(PROP_NETWORK_PROCESSOR, "*") - .build()) - .stream() + return mBeanClient.beans(SOCKET_NETWORK_PROCESSOR_QUERY).stream() .map(SocketNetworkProcessorMetric::new) .collect(Collectors.toList()); } public static List client(MBeanClient mBeanClient) { - return mBeanClient - .beans( - BeanQuery.builder() - .domainName(DOMAIN_NAME) - .property("type", METRIC_TYPE) - .property(PROP_LISTENER, "*") - .property(PROP_NETWORK_PROCESSOR, "*") - .property(PROP_CLIENT_SOFTWARE_NAME, "*") - .property(PROP_CLIENT_SOFTWARE_VERSION, "*") - .build()) - .stream() - .map(Client::new) - .collect(Collectors.toList()); + return mBeanClient.beans(CLIENT_QUERY).stream().map(Client::new).collect(Collectors.toList()); } public static class SocketMetric implements HasBeanObject { diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java index 83a716d8ba..814e0390c6 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricFetcher.java @@ -35,17 +35,43 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.astraea.common.FutureUtils; import org.astraea.common.Utils; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.MBeanClient; +import org.astraea.common.metrics.broker.ClusterMetrics; +import org.astraea.common.metrics.broker.ControllerMetrics; +import org.astraea.common.metrics.broker.LogMetrics; +import org.astraea.common.metrics.broker.NetworkMetrics; +import org.astraea.common.metrics.broker.ServerMetrics; +import org.astraea.common.metrics.client.admin.AdminMetrics; +import org.astraea.common.metrics.client.consumer.ConsumerMetrics; +import org.astraea.common.metrics.client.producer.ProducerMetrics; +import org.astraea.common.metrics.connector.ConnectorMetrics; +import org.astraea.common.metrics.platform.HostMetrics; import org.astraea.common.producer.Producer; import org.astraea.common.producer.Record; import org.astraea.common.producer.Serializer; public interface MetricFetcher extends AutoCloseable { + Collection QUERIES = + Stream.of( + LogMetrics.QUERIES.stream(), + ServerMetrics.QUERIES.stream(), + NetworkMetrics.QUERIES.stream(), + ClusterMetrics.QUERIES.stream(), + ControllerMetrics.QUERIES.stream(), + AdminMetrics.QUERIES.stream(), + ConsumerMetrics.QUERIES.stream(), + ProducerMetrics.QUERIES.stream(), + ConnectorMetrics.QUERIES.stream(), + HostMetrics.QUERIES.stream()) + .flatMap(s -> s) + .collect(Collectors.toUnmodifiableList()); + static Builder builder() { return new Builder(); } @@ -246,7 +272,10 @@ private void updateData(DelayedIdentity identity) { lock.readLock().lock(); Collection beans; try { - beans = clients.get(identity.id).beans(BeanQuery.all(), e -> {}); + beans = + QUERIES.stream() + .flatMap(q -> clients.get(identity.id).beans(q, e -> {}).stream()) + .collect(Collectors.toUnmodifiableList()); } finally { lock.readLock().unlock(); } diff --git a/common/src/test/java/org/astraea/common/cost/utils/ClusterInfoSensorTest.java b/common/src/test/java/org/astraea/common/cost/utils/ClusterInfoSensorTest.java index ee1f7c2475..965e7947e6 100644 --- a/common/src/test/java/org/astraea/common/cost/utils/ClusterInfoSensorTest.java +++ b/common/src/test/java/org/astraea/common/cost/utils/ClusterInfoSensorTest.java @@ -238,7 +238,7 @@ void testClusterId() { "type", "KafkaServer", "name", - ServerMetrics.KafkaServer.CLUSTER_ID), + ServerMetrics.KafkaServer.CLUSTER_ID.metricName()), Map.of("Value", id)))))); var info = ClusterInfoSensor.metricViewCluster(cb); diff --git a/common/src/test/java/org/astraea/common/metrics/BeanQueryIntegratedTest.java b/common/src/test/java/org/astraea/common/metrics/BeanQueryIntegratedTest.java new file mode 100644 index 0000000000..ba67cae671 --- /dev/null +++ b/common/src/test/java/org/astraea/common/metrics/BeanQueryIntegratedTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics; + +import java.util.HashSet; +import java.util.Map; +import org.astraea.common.metrics.collector.MetricFetcher; +import org.astraea.it.Service; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BeanQueryIntegratedTest { + private static final Service SERVICE = + Service.builder().numberOfWorkers(1).numberOfBrokers(1).build(); + + @AfterAll + static void closeService() { + SERVICE.close(); + } + + @Test + void testAllBuiltInQueries() { + try (var client = MBeanClient.of(SERVICE.jmxServiceURL())) { + var exist = new HashSet>(); + MetricFetcher.QUERIES.forEach( + q -> + client + .beans(q) + .forEach( + bean -> { + Assertions.assertFalse(exist.contains(bean.properties())); + exist.add(bean.properties()); + })); + } + } +} diff --git a/common/src/test/java/org/astraea/common/metrics/broker/ServerMetricsTest.java b/common/src/test/java/org/astraea/common/metrics/broker/ServerMetricsTest.java index 04d65071a9..a9f3079eec 100644 --- a/common/src/test/java/org/astraea/common/metrics/broker/ServerMetricsTest.java +++ b/common/src/test/java/org/astraea/common/metrics/broker/ServerMetricsTest.java @@ -79,7 +79,7 @@ void testKafkaServer(ServerMetrics.KafkaServer request) { @Test void testKafkaServerOtherMetrics() { - MetricsTestUtil.validate(ServerMetrics.KafkaServer.clusterId(MBeanClient.local())); + MetricsTestUtil.validate(ServerMetrics.KafkaServer.CLUSTER_ID.fetch(MBeanClient.local())); } @Test diff --git a/common/src/test/java/org/astraea/common/metrics/collector/MetricFetcherTest.java b/common/src/test/java/org/astraea/common/metrics/collector/MetricFetcherTest.java index 23f2aa4377..75134668ab 100644 --- a/common/src/test/java/org/astraea/common/metrics/collector/MetricFetcherTest.java +++ b/common/src/test/java/org/astraea/common/metrics/collector/MetricFetcherTest.java @@ -72,11 +72,19 @@ void testPublishAndClose() { Utils.sleep(Duration.ofSeconds(3)); Assertions.assertEquals(Set.of(-1000), fetcher.identities()); Assertions.assertNotEquals(0, queue.size()); - queue.forEach((id, es) -> Assertions.assertEquals(beans, es)); + queue.forEach( + (id, es) -> + Assertions.assertEquals( + beans, es.stream().distinct().collect(Collectors.toUnmodifiableList()))); var latest = fetcher.latest(); Assertions.assertEquals(1, latest.size()); - latest.values().forEach(bs -> Assertions.assertEquals(beans, bs)); + latest + .values() + .forEach( + bs -> + Assertions.assertEquals( + beans, bs.stream().distinct().collect(Collectors.toUnmodifiableList()))); } // make sure client get closed Mockito.verify(client, Mockito.times(1)).close();