From 98daa0b4f9225b0abc3c1a78c0a13f19917736cd Mon Sep 17 00:00:00 2001 From: Geordie Date: Sun, 7 Aug 2022 00:27:23 +0800 Subject: [PATCH] Add Kafka controller metrics (#554) --- .../app/metrics/broker/ControllerMetrics.java | 87 +++++++++++++++++++ .../astraea/app/metrics/MetricsTestUtil.java | 30 +++++++ .../metrics/broker/ControllerMetricsTest.java | 61 +++++++++++++ .../app/metrics/broker/LogMetricsTest.java | 7 ++ .../metrics/broker/NetworkMetricsTest.java | 9 ++ .../broker/ReplicaManagerMetricsTest.java | 9 ++ .../app/metrics/broker/ServerMetricsTest.java | 21 ++--- 7 files changed, 214 insertions(+), 10 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/metrics/broker/ControllerMetrics.java create mode 100644 app/src/test/java/org/astraea/app/metrics/MetricsTestUtil.java create mode 100644 app/src/test/java/org/astraea/app/metrics/broker/ControllerMetricsTest.java diff --git a/app/src/main/java/org/astraea/app/metrics/broker/ControllerMetrics.java b/app/src/main/java/org/astraea/app/metrics/broker/ControllerMetrics.java new file mode 100644 index 0000000000..a950a410be --- /dev/null +++ b/app/src/main/java/org/astraea/app/metrics/broker/ControllerMetrics.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.broker; + +import java.util.Arrays; +import org.astraea.app.metrics.BeanObject; +import org.astraea.app.metrics.BeanQuery; +import org.astraea.app.metrics.MBeanClient; + +public class ControllerMetrics { + public enum Controller { + ACTIVE_CONTROLLER_COUNT("ActiveControllerCount"), + OFFLINE_PARTITIONS_COUNT("OfflinePartitionsCount"), + PREFERRED_REPLICA_IMBALANCE_COUNT("PreferredReplicaImbalanceCount"), + CONTROLLER_STATE("ControllerState"), + GLOBAL_TOPIC_COUNT("GlobalTopicCount"), + GLOBAL_PARTITION_COUNT("GlobalPartitionCount"), + TOPICS_TO_DELETE_COUNT("TopicsToDeleteCount"), + REPLICAS_TO_DELETE_COUNT("ReplicasToDeleteCount"), + TOPICS_INELIGIBLE_TO_DELETE_COUNT("TopicsIneligibleToDeleteCount"), + REPLICAS_INELIGIBLE_TO_DELETE_COUNT("ReplicasIneligibleToDeleteCount"), + ACTIVE_BROKER_COUNT("ActiveBrokerCount"), + FENCED_BROKER_COUNT("FencedBrokerCount"); + + static Controller of(String metricName) { + return Arrays.stream(Controller.values()) + .filter(metric -> metric.metricName().equalsIgnoreCase(metricName)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No such metric: " + metricName)); + } + + private final String metricName; + + Controller(String metricName) { + this.metricName = metricName; + } + + public String metricName() { + return metricName; + } + + public Meter fetch(MBeanClient mBeanClient) { + return new Meter( + mBeanClient.queryBean( + BeanQuery.builder() + .domainName("kafka.controller") + .property("type", "KafkaController") + .property("name", this.metricName()) + .build())); + } + + public static class Meter implements HasValue { + private final BeanObject beanObject; + + public Meter(BeanObject beanObject) { + this.beanObject = beanObject; + } + + public String metricsName() { + return beanObject().properties().get("name"); + } + + public Controller type() { + return Controller.of(metricsName()); + } + + @Override + public BeanObject beanObject() { + return beanObject; + } + } + } +} diff --git a/app/src/test/java/org/astraea/app/metrics/MetricsTestUtil.java b/app/src/test/java/org/astraea/app/metrics/MetricsTestUtil.java new file mode 100644 index 0000000000..bd978b065c --- /dev/null +++ b/app/src/test/java/org/astraea/app/metrics/MetricsTestUtil.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class MetricsTestUtil { + + public static > boolean metricDistinct( + T[] tEnums, Function getMetricName) { + var set = Arrays.stream(tEnums).map(getMetricName).collect(Collectors.toSet()); + return set.size() == tEnums.length; + } +} diff --git a/app/src/test/java/org/astraea/app/metrics/broker/ControllerMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/ControllerMetricsTest.java new file mode 100644 index 0000000000..523a65e214 --- /dev/null +++ b/app/src/test/java/org/astraea/app/metrics/broker/ControllerMetricsTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.broker; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; +import java.util.Locale; +import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.MetricsTestUtil; +import org.astraea.app.service.RequireSingleBrokerCluster; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +class ControllerMetricsTest extends RequireSingleBrokerCluster { + + @ParameterizedTest + @EnumSource(ControllerMetrics.Controller.class) + void testController(ControllerMetrics.Controller controller) { + var meter = controller.fetch(MBeanClient.local()); + Assertions.assertTrue(meter.value() >= 0); + Assertions.assertEquals(controller, meter.type()); + } + + @Test + void testKafkaMetricsOf() { + Arrays.stream(ControllerMetrics.Controller.values()) + .forEach( + t -> { + Assertions.assertEquals( + t, ControllerMetrics.Controller.of(t.metricName().toLowerCase(Locale.ROOT))); + Assertions.assertEquals( + t, ControllerMetrics.Controller.of(t.metricName().toUpperCase(Locale.ROOT))); + }); + + assertThrows(IllegalArgumentException.class, () -> ControllerMetrics.Controller.of("nothing")); + } + + @Test + void testAllEnumNameUnique() { + Assertions.assertTrue( + MetricsTestUtil.metricDistinct( + ControllerMetrics.Controller.values(), ControllerMetrics.Controller::metricName)); + } +} diff --git a/app/src/test/java/org/astraea/app/metrics/broker/LogMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/LogMetricsTest.java index e1263a3be5..79701adbe3 100644 --- a/app/src/test/java/org/astraea/app/metrics/broker/LogMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/broker/LogMetricsTest.java @@ -25,6 +25,7 @@ import org.astraea.app.common.Utils; import org.astraea.app.metrics.HasBeanObject; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.MetricsTestUtil; import org.astraea.app.service.RequireSingleBrokerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -86,4 +87,10 @@ void testTopicPartitionMetrics(LogMetrics.Log request) { var beans = request.fetch(MBeanClient.local()); assertNotEquals(0, beans.size()); } + + @Test + void testAllEnumNameUnique() { + Assertions.assertTrue( + MetricsTestUtil.metricDistinct(LogMetrics.Log.values(), LogMetrics.Log::metricName)); + } } diff --git a/app/src/test/java/org/astraea/app/metrics/broker/NetworkMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/NetworkMetricsTest.java index ad5202a0bb..abd9c9417e 100644 --- a/app/src/test/java/org/astraea/app/metrics/broker/NetworkMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/broker/NetworkMetricsTest.java @@ -17,8 +17,10 @@ package org.astraea.app.metrics.broker; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.MetricsTestUtil; import org.astraea.app.service.RequireSingleBrokerCluster; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -44,4 +46,11 @@ void testRequestTotalTimeMs(NetworkMetrics.Request request) { Assertions.assertDoesNotThrow(totalTimeMs::stdDev); Assertions.assertEquals(request, totalTimeMs.type()); } + + @Test + void testAllEnumNameUnique() { + Assertions.assertTrue( + MetricsTestUtil.metricDistinct( + NetworkMetrics.Request.values(), NetworkMetrics.Request::metricName)); + } } diff --git a/app/src/test/java/org/astraea/app/metrics/broker/ReplicaManagerMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/ReplicaManagerMetricsTest.java index acfa47140d..c5859def20 100644 --- a/app/src/test/java/org/astraea/app/metrics/broker/ReplicaManagerMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/broker/ReplicaManagerMetricsTest.java @@ -17,8 +17,10 @@ package org.astraea.app.metrics.broker; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.MetricsTestUtil; import org.astraea.app.service.RequireSingleBrokerCluster; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -31,4 +33,11 @@ void testBrokerTopic(ServerMetrics.ReplicaManager rm) { Assertions.assertTrue(meter.value() >= 0); Assertions.assertEquals(rm, meter.type()); } + + @Test + void testAllEnumNameUnique() { + Assertions.assertTrue( + MetricsTestUtil.metricDistinct( + ServerMetrics.ReplicaManager.values(), ServerMetrics.ReplicaManager::metricName)); + } } diff --git a/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java index 9d0d808fa4..1e642c5b0a 100644 --- a/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java @@ -22,11 +22,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import org.astraea.app.common.Utils; import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.MetricsTestUtil; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -76,13 +75,15 @@ void testBrokerTopic(ServerMetrics.Topic brokerTopic) { @Test void testAllEnumNameUnique() { - // arrange act - Set collectedName = - Arrays.stream(ServerMetrics.Topic.values()) - .map(ServerMetrics.Topic::metricName) - .collect(Collectors.toSet()); - - // assert - Assertions.assertEquals(ServerMetrics.Topic.values().length, collectedName.size()); + Assertions.assertTrue( + MetricsTestUtil.metricDistinct( + ServerMetrics.ReplicaManager.values(), ServerMetrics.ReplicaManager::metricName)); + Assertions.assertTrue( + MetricsTestUtil.metricDistinct( + ServerMetrics.DelayedOperationPurgatory.values(), + ServerMetrics.DelayedOperationPurgatory::metricName)); + Assertions.assertTrue( + MetricsTestUtil.metricDistinct( + ServerMetrics.Topic.values(), ServerMetrics.Topic::metricName)); } }