Skip to content

Commit

Permalink
Add Kafka controller metrics (opensource4you#554)
Browse files Browse the repository at this point in the history
  • Loading branch information
g1geordie authored Aug 6, 2022
1 parent d70b167 commit 98daa0b
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics.broker;

import java.util.Arrays;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.BeanQuery;
import org.astraea.app.metrics.MBeanClient;

public class ControllerMetrics {
public enum Controller {
ACTIVE_CONTROLLER_COUNT("ActiveControllerCount"),
OFFLINE_PARTITIONS_COUNT("OfflinePartitionsCount"),
PREFERRED_REPLICA_IMBALANCE_COUNT("PreferredReplicaImbalanceCount"),
CONTROLLER_STATE("ControllerState"),
GLOBAL_TOPIC_COUNT("GlobalTopicCount"),
GLOBAL_PARTITION_COUNT("GlobalPartitionCount"),
TOPICS_TO_DELETE_COUNT("TopicsToDeleteCount"),
REPLICAS_TO_DELETE_COUNT("ReplicasToDeleteCount"),
TOPICS_INELIGIBLE_TO_DELETE_COUNT("TopicsIneligibleToDeleteCount"),
REPLICAS_INELIGIBLE_TO_DELETE_COUNT("ReplicasIneligibleToDeleteCount"),
ACTIVE_BROKER_COUNT("ActiveBrokerCount"),
FENCED_BROKER_COUNT("FencedBrokerCount");

static Controller of(String metricName) {
return Arrays.stream(Controller.values())
.filter(metric -> metric.metricName().equalsIgnoreCase(metricName))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No such metric: " + metricName));
}

private final String metricName;

Controller(String metricName) {
this.metricName = metricName;
}

public String metricName() {
return metricName;
}

public Meter fetch(MBeanClient mBeanClient) {
return new Meter(
mBeanClient.queryBean(
BeanQuery.builder()
.domainName("kafka.controller")
.property("type", "KafkaController")
.property("name", this.metricName())
.build()));
}

public static class Meter implements HasValue {
private final BeanObject beanObject;

public Meter(BeanObject beanObject) {
this.beanObject = beanObject;
}

public String metricsName() {
return beanObject().properties().get("name");
}

public Controller type() {
return Controller.of(metricsName());
}

@Override
public BeanObject beanObject() {
return beanObject;
}
}
}
}
30 changes: 30 additions & 0 deletions app/src/test/java/org/astraea/app/metrics/MetricsTestUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics;

import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Collectors;

public class MetricsTestUtil {

public static <T extends Enum<T>> boolean metricDistinct(
T[] tEnums, Function<T, String> getMetricName) {
var set = Arrays.stream(tEnums).map(getMetricName).collect(Collectors.toSet());
return set.size() == tEnums.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.metrics.broker;

import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Arrays;
import java.util.Locale;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.MetricsTestUtil;
import org.astraea.app.service.RequireSingleBrokerCluster;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

class ControllerMetricsTest extends RequireSingleBrokerCluster {

@ParameterizedTest
@EnumSource(ControllerMetrics.Controller.class)
void testController(ControllerMetrics.Controller controller) {
var meter = controller.fetch(MBeanClient.local());
Assertions.assertTrue(meter.value() >= 0);
Assertions.assertEquals(controller, meter.type());
}

@Test
void testKafkaMetricsOf() {
Arrays.stream(ControllerMetrics.Controller.values())
.forEach(
t -> {
Assertions.assertEquals(
t, ControllerMetrics.Controller.of(t.metricName().toLowerCase(Locale.ROOT)));
Assertions.assertEquals(
t, ControllerMetrics.Controller.of(t.metricName().toUpperCase(Locale.ROOT)));
});

assertThrows(IllegalArgumentException.class, () -> ControllerMetrics.Controller.of("nothing"));
}

@Test
void testAllEnumNameUnique() {
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(
ControllerMetrics.Controller.values(), ControllerMetrics.Controller::metricName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.HasBeanObject;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.MetricsTestUtil;
import org.astraea.app.service.RequireSingleBrokerCluster;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -86,4 +87,10 @@ void testTopicPartitionMetrics(LogMetrics.Log request) {
var beans = request.fetch(MBeanClient.local());
assertNotEquals(0, beans.size());
}

@Test
void testAllEnumNameUnique() {
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(LogMetrics.Log.values(), LogMetrics.Log::metricName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.astraea.app.metrics.broker;

import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.MetricsTestUtil;
import org.astraea.app.service.RequireSingleBrokerCluster;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -44,4 +46,11 @@ void testRequestTotalTimeMs(NetworkMetrics.Request request) {
Assertions.assertDoesNotThrow(totalTimeMs::stdDev);
Assertions.assertEquals(request, totalTimeMs.type());
}

@Test
void testAllEnumNameUnique() {
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(
NetworkMetrics.Request.values(), NetworkMetrics.Request::metricName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.astraea.app.metrics.broker;

import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.MetricsTestUtil;
import org.astraea.app.service.RequireSingleBrokerCluster;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -31,4 +33,11 @@ void testBrokerTopic(ServerMetrics.ReplicaManager rm) {
Assertions.assertTrue(meter.value() >= 0);
Assertions.assertEquals(rm, meter.type());
}

@Test
void testAllEnumNameUnique() {
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(
ServerMetrics.ReplicaManager.values(), ServerMetrics.ReplicaManager::metricName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.astraea.app.common.Utils;
import org.astraea.app.metrics.BeanObject;
import org.astraea.app.metrics.MBeanClient;
import org.astraea.app.metrics.MetricsTestUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -76,13 +75,15 @@ void testBrokerTopic(ServerMetrics.Topic brokerTopic) {

@Test
void testAllEnumNameUnique() {
// arrange act
Set<String> collectedName =
Arrays.stream(ServerMetrics.Topic.values())
.map(ServerMetrics.Topic::metricName)
.collect(Collectors.toSet());

// assert
Assertions.assertEquals(ServerMetrics.Topic.values().length, collectedName.size());
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(
ServerMetrics.ReplicaManager.values(), ServerMetrics.ReplicaManager::metricName));
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(
ServerMetrics.DelayedOperationPurgatory.values(),
ServerMetrics.DelayedOperationPurgatory::metricName));
Assertions.assertTrue(
MetricsTestUtil.metricDistinct(
ServerMetrics.Topic.values(), ServerMetrics.Topic::metricName));
}
}

0 comments on commit 98daa0b

Please sign in to comment.