From 4e787f980e1fc8b6ffcefb1a08de05b9fbd5edcb Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 28 Jul 2022 20:41:42 +0800 Subject: [PATCH] Move purgatory metrics to separate file (#515) --- .../org/astraea/app/metrics/KafkaMetrics.java | 48 ---------- .../app/metrics/broker/ServerMetrics.java | 89 +++++++++++++++++++ .../astraea/app/metrics/KafkaMetricsTest.java | 7 -- .../app/metrics/broker/ServerMetricsTest.java | 31 +++++++ 4 files changed, 120 insertions(+), 55 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java create mode 100644 app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java diff --git a/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java b/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java index 38dab18a42..ff4587b473 100644 --- a/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java @@ -158,54 +158,6 @@ public static long linuxDiskWriteBytes(MBeanClient mBeanClient) { } } - public enum Purgatory { - AlterAcls("AlterAcls"), - DeleteRecords("DeleteRecords"), - ElectLeader("ElectLeader"), - Fetch("Fetch"), - Heartbeat("Heartbeat"), - Produce("Produce"), - Rebalance("Rebalance"); - - private final String metricName; - - Purgatory(String name) { - this.metricName = name; - } - - public String metricName() { - return metricName; - } - - public Collection fetch(MBeanClient mBeanClient) { - return mBeanClient - .queryBeans( - BeanQuery.builder() - .domainName("kafka.server") - .property("type", "DelayedOperationPurgatory") - .property("delayedOperation", metricName) - .property("name", "PurgatorySize") - .build()) - .stream() - .map(HasValue::of) - .collect(Collectors.toUnmodifiableList()); - } - - public int size(MBeanClient mBeanClient) { - return (int) - mBeanClient - .queryBean( - BeanQuery.builder() - .domainName("kafka.server") - .property("type", "DelayedOperationPurgatory") - .property("delayedOperation", this.name()) - .property("name", "PurgatorySize") - .build()) - .attributes() - .get("Value"); - } - } - public enum Request { Produce, FetchConsumer, diff --git a/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java b/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java new file mode 100644 index 0000000000..8af7a1ab7d --- /dev/null +++ b/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.broker; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import org.astraea.app.metrics.jmx.BeanObject; +import org.astraea.app.metrics.jmx.BeanQuery; +import org.astraea.app.metrics.jmx.MBeanClient; + +public final class ServerMetrics { + + public enum DelayedOperationPurgatory { + AlterAcls("AlterAcls"), + DeleteRecords("DeleteRecords"), + ElectLeader("ElectLeader"), + Fetch("Fetch"), + Heartbeat("Heartbeat"), + Produce("Produce"), + Rebalance("Rebalance"); + + private final String metricName; + + DelayedOperationPurgatory(String name) { + this.metricName = name; + } + + public String metricName() { + return metricName; + } + + public Collection fetch(MBeanClient mBeanClient) { + return mBeanClient + .queryBeans( + BeanQuery.builder() + .domainName("kafka.server") + .property("type", "DelayedOperationPurgatory") + .property("delayedOperation", metricName) + .property("name", "PurgatorySize") + .build()) + .stream() + .map(Size::new) + .collect(Collectors.toUnmodifiableList()); + } + + public static DelayedOperationPurgatory of(String metricName) { + return Arrays.stream(DelayedOperationPurgatory.values()) + .filter(metric -> metric.metricName().equalsIgnoreCase(metricName)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No such metric: " + metricName)); + } + + public static class Size implements HasValue { + private final BeanObject beanObject; + + public Size(BeanObject beanObject) { + this.beanObject = beanObject; + } + + public String metricsName() { + return beanObject().properties().get("delayedOperation"); + } + + public DelayedOperationPurgatory type() { + return DelayedOperationPurgatory.of(metricsName()); + } + + @Override + public BeanObject beanObject() { + return beanObject; + } + } + } +} diff --git a/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java index 75199eac99..5632e7d26f 100644 --- a/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java @@ -108,13 +108,6 @@ void testRequestBrokerTopicMetrics(KafkaMetrics.BrokerTopic metric) { assertDoesNotThrow(result::rateUnit); } - @ParameterizedTest() - @EnumSource(value = KafkaMetrics.Purgatory.class) - void testPurgatorySize(KafkaMetrics.Purgatory request) { - // act assert type casting correct and field exists - assertDoesNotThrow(() -> request.size(mBeanClient)); - } - @ParameterizedTest() @EnumSource(value = KafkaMetrics.Request.class) void testRequestTotalTimeMs(KafkaMetrics.Request request) { diff --git a/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java new file mode 100644 index 0000000000..b53126489f --- /dev/null +++ b/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.broker; + +import org.astraea.app.metrics.jmx.MBeanClient; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class ServerMetricsTest { + + @ParameterizedTest() + @EnumSource(value = ServerMetrics.DelayedOperationPurgatory.class) + void testPurgatorySize(ServerMetrics.DelayedOperationPurgatory request) { + request.fetch(MBeanClient.local()).forEach(s -> Assertions.assertTrue(s.value() >= 0)); + } +}