From 381d91329c30ae1651f0275ad7defd2314e0baff Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 29 Jul 2022 22:40:51 +0800 Subject: [PATCH] Refactor broker topic metrics (#521) --- .../org/astraea/app/cost/BrokerInputCost.java | 9 +- .../astraea/app/cost/BrokerOutputCost.java | 9 +- .../java/org/astraea/app/cost/LoadCost.java | 15 +- .../org/astraea/app/metrics/KafkaMetrics.java | 124 --------------- .../broker/BrokerTopicMetricsResult.java | 48 ------ .../app/metrics/broker/ServerMetrics.java | 149 +++++++++++++++++- .../app/metrics/platform/HostMetrics.java | 23 +++ .../astraea/app/cost/BrokerInputCostTest.java | 22 +-- .../app/cost/BrokerOutputCostTest.java | 22 +-- .../org/astraea/app/cost/LoadCostTest.java | 19 ++- .../astraea/app/metrics/KafkaMetricsTest.java | 83 ---------- .../app/metrics/broker/ServerMetricsTest.java | 57 +++++++ .../broker/ServerTopicMetricsTest.java | 43 +++++ .../platform/LinuxDiskMetricsTest.java | 40 +++++ 14 files changed, 351 insertions(+), 312 deletions(-) delete mode 100644 app/src/main/java/org/astraea/app/metrics/broker/BrokerTopicMetricsResult.java create mode 100644 app/src/test/java/org/astraea/app/metrics/broker/ServerTopicMetricsTest.java create mode 100644 app/src/test/java/org/astraea/app/metrics/platform/LinuxDiskMetricsTest.java diff --git a/app/src/main/java/org/astraea/app/cost/BrokerInputCost.java b/app/src/main/java/org/astraea/app/cost/BrokerInputCost.java index b93d2ccee1..fd8aa7e467 100644 --- a/app/src/main/java/org/astraea/app/cost/BrokerInputCost.java +++ b/app/src/main/java/org/astraea/app/cost/BrokerInputCost.java @@ -22,8 +22,7 @@ import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; -import org.astraea.app.metrics.KafkaMetrics; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; +import org.astraea.app.metrics.broker.ServerMetrics; import org.astraea.app.metrics.collector.Fetcher; public class BrokerInputCost implements HasBrokerCost { @@ -35,14 +34,14 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { Collectors.toMap( Map.Entry::getKey, entry -> - KafkaMetrics.BrokerTopic.BytesInPerSec.of(entry.getValue()).stream() - .mapToDouble(BrokerTopicMetricsResult::oneMinuteRate) + ServerMetrics.Topic.BYTES_IN_PER_SEC.of(entry.getValue()).stream() + .mapToDouble(ServerMetrics.Topic.Meter::oneMinuteRate) .sum())); return () -> brokerCost; } @Override public Optional fetcher() { - return Optional.of(client -> List.of(KafkaMetrics.BrokerTopic.BytesInPerSec.fetch(client))); + return Optional.of(client -> List.of(ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client))); } } diff --git a/app/src/main/java/org/astraea/app/cost/BrokerOutputCost.java b/app/src/main/java/org/astraea/app/cost/BrokerOutputCost.java index c4a7b35130..14acdd8588 100644 --- a/app/src/main/java/org/astraea/app/cost/BrokerOutputCost.java +++ b/app/src/main/java/org/astraea/app/cost/BrokerOutputCost.java @@ -22,8 +22,7 @@ import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; -import org.astraea.app.metrics.KafkaMetrics; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; +import org.astraea.app.metrics.broker.ServerMetrics; import org.astraea.app.metrics.collector.Fetcher; public class BrokerOutputCost implements HasBrokerCost { @@ -36,14 +35,14 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { Collectors.toMap( Map.Entry::getKey, entry -> - KafkaMetrics.BrokerTopic.BytesOutPerSec.of(entry.getValue()).stream() - .mapToDouble(BrokerTopicMetricsResult::oneMinuteRate) + ServerMetrics.Topic.BYTES_OUT_PER_SEC.of(entry.getValue()).stream() + .mapToDouble(ServerMetrics.Topic.Meter::oneMinuteRate) .sum())); return () -> brokerCost; } @Override public Optional fetcher() { - return Optional.of(client -> List.of(KafkaMetrics.BrokerTopic.BytesOutPerSec.fetch(client))); + return Optional.of(client -> List.of(ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client))); } } diff --git a/app/src/main/java/org/astraea/app/cost/LoadCost.java b/app/src/main/java/org/astraea/app/cost/LoadCost.java index 469e588923..ed44e03d02 100644 --- a/app/src/main/java/org/astraea/app/cost/LoadCost.java +++ b/app/src/main/java/org/astraea/app/cost/LoadCost.java @@ -26,8 +26,7 @@ import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.HasBeanObject; -import org.astraea.app.metrics.KafkaMetrics; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; +import org.astraea.app.metrics.broker.ServerMetrics; import org.astraea.app.metrics.collector.Fetcher; import org.astraea.app.partitioner.PartitionerUtils; @@ -35,9 +34,9 @@ public class LoadCost implements HasBrokerCost { private final Map brokersMetric = new HashMap<>(); private final Map metricNameAndWeight = Map.of( - KafkaMetrics.BrokerTopic.BytesInPerSec.metricName(), + ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), 0.5, - KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName(), + ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), 0.5); /** Do "Poisson" and "weightPoisson" calculation on "load". And change output to double. */ @@ -86,8 +85,8 @@ Map computeLoad(Map> allBea (brokerID, value) -> { if (!brokersMetric.containsKey(brokerID)) brokersMetric.put(brokerID, new BrokerMetric()); value.stream() - .filter(hasBeanObject -> hasBeanObject instanceof BrokerTopicMetricsResult) - .map(hasBeanObject -> (BrokerTopicMetricsResult) hasBeanObject) + .filter(hasBeanObject -> hasBeanObject instanceof ServerMetrics.Topic.Meter) + .map(hasBeanObject -> (ServerMetrics.Topic.Meter) hasBeanObject) .forEach( result -> brokersMetric @@ -151,8 +150,8 @@ public Optional fetcher() { return Optional.of( client -> List.of( - KafkaMetrics.BrokerTopic.BytesInPerSec.fetch(client), - KafkaMetrics.BrokerTopic.BytesOutPerSec.fetch(client))); + ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client), + ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client))); } private static class BrokerMetric { diff --git a/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java b/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java index ba3dc5731a..8fd0cf741e 100644 --- a/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/KafkaMetrics.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; import org.astraea.app.metrics.broker.HasValue; import org.astraea.app.metrics.broker.TotalTimeMs; import org.astraea.app.metrics.producer.HasProducerNodeMetrics; @@ -31,129 +30,6 @@ public final class KafkaMetrics { private KafkaMetrics() {} - public enum BrokerTopic { - /** Message validation failure rate due to non-continuous offset or sequence number in batch */ - InvalidOffsetOrSequenceRecordsPerSec("InvalidOffsetOrSequenceRecordsPerSec"), - - /** Message validation failure rate due to incorrect crc checksum */ - InvalidMessageCrcRecordsPerSec("InvalidMessageCrcRecordsPerSec"), - - FetchMessageConversionsPerSec("FetchMessageConversionsPerSec"), - - BytesRejectedPerSec("BytesRejectedPerSec"), - - /** Message in rate */ - MessagesInPerSec("MessagesInPerSec"), - - /** Incoming byte rate of reassignment traffic */ - ReassignmentBytesInPerSec("ReassignmentBytesInPerSec"), - - FailedFetchRequestsPerSec("FailedFetchRequestsPerSec"), - - /** Byte in rate from other brokers */ - ReplicationBytesInPerSec("ReplicationBytesInPerSec"), - - /** Message validation failure rate due to no key specified for compacted topic */ - NoKeyCompactedTopicRecordsPerSec("NoKeyCompactedTopicRecordsPerSec"), - - TotalFetchRequestsPerSec("TotalFetchRequestsPerSec"), - - FailedProduceRequestsPerSec("FailedProduceRequestsPerSec"), - - /** Byte in rate from clients */ - BytesInPerSec("BytesInPerSec"), - - TotalProduceRequestsPerSec("TotalProduceRequestsPerSec"), - - /** Message validation failure rate due to invalid magic number */ - InvalidMagicNumberRecordsPerSec("InvalidMagicNumberRecordsPerSec"), - - /** Outgoing byte rate of reassignment traffic */ - ReassignmentBytesOutPerSec("ReassignmentBytesOutPerSec"), - - /** Bytes in rate from other brokers */ - ReplicationBytesOutPerSec("ReplicationBytesOutPerSec"), - - ProduceMessageConversionsPerSec("ProduceMessageConversionsPerSec"), - - /** Byte out rate to clients. */ - BytesOutPerSec("BytesOutPerSec"); - - private final String metricName; - - BrokerTopic(String name) { - this.metricName = name; - } - - public String metricName() { - return metricName; - } - - /** - * find out the objects related to this metrics. - * - * @param objects to search - * @return collection of BrokerTopicMetricsResult, or empty if all objects are not related to - * this metrics - */ - public Collection of(Collection objects) { - return objects.stream() - .filter(o -> o instanceof BrokerTopicMetricsResult) - .filter(o -> metricName().equals(o.beanObject().properties().get("name"))) - .map(o -> (BrokerTopicMetricsResult) o) - .collect(Collectors.toUnmodifiableList()); - } - - public BrokerTopicMetricsResult fetch(MBeanClient mBeanClient) { - return new BrokerTopicMetricsResult( - mBeanClient.queryBean( - BeanQuery.builder() - .domainName("kafka.server") - .property("type", "BrokerTopicMetrics") - .property("name", this.metricName()) - .build())); - } - - /** - * resolve specific {@link BrokerTopic} by the given metric string, compare by case insensitive - * - * @param metricName the metric to resolve - * @return a {@link BrokerTopic} match to give metric name - */ - public static BrokerTopic of(String metricName) { - return Arrays.stream(BrokerTopic.values()) - .filter(metric -> metric.metricName().equalsIgnoreCase(metricName)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException("No such metric: " + metricName)); - } - - public static long linuxDiskReadBytes(MBeanClient mBeanClient) { - return (long) - mBeanClient - .queryBean( - BeanQuery.builder() - .domainName("kafka.server") - .property("type", "KafkaServer") - .property("name", "linux-disk-read-bytes") - .build()) - .attributes() - .get("Value"); - } - - public static long linuxDiskWriteBytes(MBeanClient mBeanClient) { - return (long) - mBeanClient - .queryBean( - BeanQuery.builder() - .domainName("kafka.server") - .property("type", "KafkaServer") - .property("name", "linux-disk-write-bytes") - .build()) - .attributes() - .get("Value"); - } - } - public enum Request { Produce, FetchConsumer, diff --git a/app/src/main/java/org/astraea/app/metrics/broker/BrokerTopicMetricsResult.java b/app/src/main/java/org/astraea/app/metrics/broker/BrokerTopicMetricsResult.java deleted file mode 100644 index 74bfb32e2a..0000000000 --- a/app/src/main/java/org/astraea/app/metrics/broker/BrokerTopicMetricsResult.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.app.metrics.broker; - -import java.util.Map; -import java.util.Objects; -import org.astraea.app.metrics.BeanObject; - -public class BrokerTopicMetricsResult implements HasCount, HasEventType, HasRate { - - private final BeanObject beanObject; - - public BrokerTopicMetricsResult(BeanObject beanObject) { - this.beanObject = Objects.requireNonNull(beanObject); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - for (Map.Entry e : beanObject().attributes().entrySet()) { - sb.append(System.lineSeparator()) - .append(" ") - .append(e.getKey()) - .append("=") - .append(e.getValue()); - } - return beanObject().properties().get("name") + "{" + sb + "}"; - } - - @Override - public BeanObject beanObject() { - return beanObject; - } -} diff --git a/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java b/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java index 303b55b953..305726d5ce 100644 --- a/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/broker/ServerMetrics.java @@ -18,21 +18,24 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.BeanQuery; +import org.astraea.app.metrics.HasBeanObject; import org.astraea.app.metrics.MBeanClient; public final class ServerMetrics { public enum DelayedOperationPurgatory { - AlterAcls("AlterAcls"), - DeleteRecords("DeleteRecords"), - ElectLeader("ElectLeader"), - Fetch("Fetch"), - Heartbeat("Heartbeat"), - Produce("Produce"), - Rebalance("Rebalance"); + ALTER_ACLS("AlterAcls"), + DELETE_RECORDS("DeleteRecords"), + ELECT_LEADER("ElectLeader"), + FETCH("Fetch"), + HEARTBEAT("Heartbeat"), + PRODUCE("Produce"), + REBALANCE("Rebalance"); private final String metricName; @@ -86,4 +89,136 @@ public BeanObject beanObject() { } } } + + public enum Topic { + /** Message validation failure rate due to non-continuous offset or sequence number in batch */ + INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC("InvalidOffsetOrSequenceRecordsPerSec"), + + /** Message validation failure rate due to incorrect crc checksum */ + INVALID_MESSAGE_CRC_RECORDS_PER_SEC("InvalidMessageCrcRecordsPerSec"), + + FETCH_MESSAGE_CONVERSIONS_PER_SEC("FetchMessageConversionsPerSec"), + + BYTES_REJECTED_PER_SEC("BytesRejectedPerSec"), + + /** Message in rate */ + MESSAGES_IN_PER_SEC("MessagesInPerSec"), + + /** Incoming byte rate of reassignment traffic */ + REASSIGNMENT_BYTES_IN_PER_SEC("ReassignmentBytesInPerSec"), + + FAILED_FETCH_REQUESTS_PER_SEC("FailedFetchRequestsPerSec"), + + /** Byte in rate from other brokers */ + REPLICATION_BYTES_IN_PER_SEC("ReplicationBytesInPerSec"), + + /** Message validation failure rate due to no key specified for compacted topic */ + NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC("NoKeyCompactedTopicRecordsPerSec"), + + TOTAL_FETCH_REQUESTS_PER_SEC("TotalFetchRequestsPerSec"), + + FAILED_PRODUCE_REQUESTS_PER_SEC("FailedProduceRequestsPerSec"), + + /** Byte in rate from clients */ + BYTES_IN_PER_SEC("BytesInPerSec"), + + TOTAL_PRODUCE_REQUESTS_PER_SEC("TotalProduceRequestsPerSec"), + + /** Message validation failure rate due to invalid magic number */ + INVALID_MAGIC_NUMBER_RECORDS_PER_SEC("InvalidMagicNumberRecordsPerSec"), + + /** Outgoing byte rate of reassignment traffic */ + REASSIGNMENT_BYTES_OUT_PER_SEC("ReassignmentBytesOutPerSec"), + + /** Bytes in rate from other brokers */ + REPLICATION_BYTES_OUT_PER_SEC("ReplicationBytesOutPerSec"), + + PRODUCE_MESSAGE_CONVERSIONS_PER_SEC("ProduceMessageConversionsPerSec"), + + /** Byte out rate to clients. */ + BYTES_OUT_PER_SEC("BytesOutPerSec"); + + private final String metricName; + + Topic(String name) { + this.metricName = name; + } + + public String metricName() { + return metricName; + } + + /** + * find out the objects related to this metrics. + * + * @param objects to search + * @return collection of BrokerTopicMetricsResult, or empty if all objects are not related to + * this metrics + */ + public Collection of(Collection objects) { + return objects.stream() + .filter(o -> o instanceof Meter) + .filter(o -> metricName().equals(o.beanObject().properties().get("name"))) + .map(o -> (Meter) o) + .collect(Collectors.toUnmodifiableList()); + } + + public Meter fetch(MBeanClient mBeanClient) { + return new Meter( + mBeanClient.queryBean( + BeanQuery.builder() + .domainName("kafka.server") + .property("type", "BrokerTopicMetrics") + .property("name", this.metricName()) + .build())); + } + + /** + * resolve specific {@link Topic} by the given metric string, compare by case-insensitive + * + * @param metricName the metric to resolve + * @return a {@link Topic} match to give metric name + */ + public static Topic of(String metricName) { + return Arrays.stream(Topic.values()) + .filter(metric -> metric.metricName().equalsIgnoreCase(metricName)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No such metric: " + metricName)); + } + + public static class Meter implements HasCount, HasEventType, HasRate { + + private final BeanObject beanObject; + + public Meter(BeanObject beanObject) { + this.beanObject = Objects.requireNonNull(beanObject); + } + + public String metricsName() { + return beanObject().properties().get("name"); + } + + public Topic type() { + return Topic.of(metricsName()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Map.Entry e : beanObject().attributes().entrySet()) { + sb.append(System.lineSeparator()) + .append(" ") + .append(e.getKey()) + .append("=") + .append(e.getValue()); + } + return beanObject().properties().get("name") + "{" + sb + "}"; + } + + @Override + public BeanObject beanObject() { + return beanObject; + } + } + } } diff --git a/app/src/main/java/org/astraea/app/metrics/platform/HostMetrics.java b/app/src/main/java/org/astraea/app/metrics/platform/HostMetrics.java index 1b7937fa0f..2aaf0c1fab 100644 --- a/app/src/main/java/org/astraea/app/metrics/platform/HostMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/platform/HostMetrics.java @@ -18,6 +18,7 @@ import org.astraea.app.metrics.BeanQuery; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.broker.HasValue; public final class HostMetrics { @@ -36,5 +37,27 @@ public static JvmMemory jvmMemory(MBeanClient mBeanClient) { BeanQuery.builder().domainName("java.lang").property("type", "Memory").build())); } + public static HasValue linuxDiskReadBytes(MBeanClient mBeanClient) { + var bean = + mBeanClient.queryBean( + BeanQuery.builder() + .domainName("kafka.server") + .property("type", "KafkaServer") + .property("name", "linux-disk-read-bytes") + .build()); + return () -> bean; + } + + public static HasValue linuxDiskWriteBytes(MBeanClient mBeanClient) { + var bean = + mBeanClient.queryBean( + BeanQuery.builder() + .domainName("kafka.server") + .property("type", "KafkaServer") + .property("name", "linux-disk-write-bytes") + .build()); + return () -> bean; + } + private HostMetrics() {} } diff --git a/app/src/test/java/org/astraea/app/cost/BrokerInputCostTest.java b/app/src/test/java/org/astraea/app/cost/BrokerInputCostTest.java index 95c43e3e0a..eb08553718 100644 --- a/app/src/test/java/org/astraea/app/cost/BrokerInputCostTest.java +++ b/app/src/test/java/org/astraea/app/cost/BrokerInputCostTest.java @@ -21,8 +21,7 @@ import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.BeanObject; -import org.astraea.app.metrics.KafkaMetrics; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; +import org.astraea.app.metrics.broker.ServerMetrics; import org.astraea.app.metrics.collector.BeanCollector; import org.astraea.app.metrics.collector.Receiver; import org.astraea.app.service.RequireBrokerCluster; @@ -41,11 +40,11 @@ void testCost() { ClusterBean.of( Map.of( 1, - List.of(brokerTopicMetricsResult(10000D)), + List.of(meter(10000D)), 2, - List.of(brokerTopicMetricsResult(20000D)), + List.of(meter(20000D)), 3, - List.of(brokerTopicMetricsResult(5000D))))) + List.of(meter(5000D))))) .value(); Assertions.assertEquals(10000D, scores.get(1)); Assertions.assertEquals(20000D, scores.get(2)); @@ -69,23 +68,24 @@ void testFetcher() { receiver.current().stream() .allMatch( o -> - (o instanceof BrokerTopicMetricsResult) - && (KafkaMetrics.BrokerTopic.BytesInPerSec.metricName() + (o instanceof ServerMetrics.Topic.Meter) + && (ServerMetrics.Topic.BYTES_IN_PER_SEC + .metricName() .equals(o.beanObject().properties().get("name"))))); // Test the fetched object's value. Assertions.assertTrue( receiver.current().stream() - .map(o -> (BrokerTopicMetricsResult) o) + .map(o -> (ServerMetrics.Topic.Meter) o) .allMatch(result -> result.count() == 0)); } } - private static BrokerTopicMetricsResult brokerTopicMetricsResult(double value) { - return new BrokerTopicMetricsResult( + private static ServerMetrics.Topic.Meter meter(double value) { + return new ServerMetrics.Topic.Meter( new BeanObject( "object", - Map.of("name", KafkaMetrics.BrokerTopic.BytesInPerSec.metricName()), + Map.of("name", ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName()), Map.of("OneMinuteRate", value))); } } diff --git a/app/src/test/java/org/astraea/app/cost/BrokerOutputCostTest.java b/app/src/test/java/org/astraea/app/cost/BrokerOutputCostTest.java index 0e736aa3c6..092aaa521c 100644 --- a/app/src/test/java/org/astraea/app/cost/BrokerOutputCostTest.java +++ b/app/src/test/java/org/astraea/app/cost/BrokerOutputCostTest.java @@ -21,8 +21,7 @@ import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.BeanObject; -import org.astraea.app.metrics.KafkaMetrics; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; +import org.astraea.app.metrics.broker.ServerMetrics; import org.astraea.app.metrics.collector.BeanCollector; import org.astraea.app.metrics.collector.Receiver; import org.astraea.app.service.RequireBrokerCluster; @@ -41,11 +40,11 @@ void testCost() { ClusterBean.of( Map.of( 1, - List.of(brokerTopicMetricsResult(10000D)), + List.of(meter(10000D)), 2, - List.of(brokerTopicMetricsResult(20000D)), + List.of(meter(20000D)), 3, - List.of(brokerTopicMetricsResult(5000D))))) + List.of(meter(5000D))))) .value(); Assertions.assertEquals(10000D, scores.get(1)); Assertions.assertEquals(20000D, scores.get(2)); @@ -69,23 +68,24 @@ void testFetcher() { receiver.current().stream() .allMatch( o -> - (o instanceof BrokerTopicMetricsResult) - && (KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName() + (o instanceof ServerMetrics.Topic.Meter) + && (ServerMetrics.Topic.BYTES_OUT_PER_SEC + .metricName() .equals(o.beanObject().properties().get("name"))))); // Test the fetched object's value. Assertions.assertTrue( receiver.current().stream() - .map(o -> (BrokerTopicMetricsResult) o) + .map(o -> (ServerMetrics.Topic.Meter) o) .allMatch(result -> result.count() == 0)); } } - private static BrokerTopicMetricsResult brokerTopicMetricsResult(double value) { - return new BrokerTopicMetricsResult( + private static ServerMetrics.Topic.Meter meter(double value) { + return new ServerMetrics.Topic.Meter( new BeanObject( "object", - Map.of("name", KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName()), + Map.of("name", ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName()), Map.of("OneMinuteRate", value))); } } diff --git a/app/src/test/java/org/astraea/app/cost/LoadCostTest.java b/app/src/test/java/org/astraea/app/cost/LoadCostTest.java index 508d165400..8ec340c2e8 100644 --- a/app/src/test/java/org/astraea/app/cost/LoadCostTest.java +++ b/app/src/test/java/org/astraea/app/cost/LoadCostTest.java @@ -22,8 +22,7 @@ import org.astraea.app.admin.ClusterBean; import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.HasBeanObject; -import org.astraea.app.metrics.KafkaMetrics; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; +import org.astraea.app.metrics.broker.ServerMetrics; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -48,12 +47,12 @@ void testComputeLoad() { } private ClusterBean clusterBean() { - var BytesInPerSec1 = mockResult(KafkaMetrics.BrokerTopic.BytesInPerSec.metricName(), 50000L); - var BytesInPerSec2 = mockResult(KafkaMetrics.BrokerTopic.BytesInPerSec.metricName(), 100000L); - var BytesInPerSec3 = mockResult(KafkaMetrics.BrokerTopic.BytesInPerSec.metricName(), 200000L); - var BytesOutPerSec1 = mockResult(KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName(), 210L); - var BytesOutPerSec2 = mockResult(KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName(), 20L); - var BytesOutPerSec3 = mockResult(KafkaMetrics.BrokerTopic.BytesOutPerSec.metricName(), 10L); + var BytesInPerSec1 = mockResult(ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), 50000L); + var BytesInPerSec2 = mockResult(ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), 100000L); + var BytesInPerSec3 = mockResult(ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), 200000L); + var BytesOutPerSec1 = mockResult(ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), 210L); + var BytesOutPerSec2 = mockResult(ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), 20L); + var BytesOutPerSec3 = mockResult(ServerMetrics.Topic.BYTES_OUT_PER_SEC.metricName(), 10L); Collection broker1 = List.of(BytesInPerSec1, BytesOutPerSec1); Collection broker2 = List.of(BytesInPerSec2, BytesOutPerSec2); @@ -61,8 +60,8 @@ private ClusterBean clusterBean() { return ClusterBean.of(Map.of(1, broker1, 2, broker2, 3, broker3)); } - private BrokerTopicMetricsResult mockResult(String name, long count) { - var result = Mockito.mock(BrokerTopicMetricsResult.class); + private ServerMetrics.Topic.Meter mockResult(String name, long count) { + var result = Mockito.mock(ServerMetrics.Topic.Meter.class); var bean = Mockito.mock(BeanObject.class); Mockito.when(result.beanObject()).thenReturn(bean); Mockito.when(bean.properties()).thenReturn(Map.of("name", name)); diff --git a/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java index a75fc34653..726a5ba4db 100644 --- a/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/KafkaMetricsTest.java @@ -17,34 +17,22 @@ package org.astraea.app.metrics; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.condition.OS.LINUX; import java.io.IOException; import java.lang.management.ManagementFactory; import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import javax.management.MBeanServer; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; import javax.management.remote.JMXServiceURL; import org.astraea.app.admin.Admin; import org.astraea.app.common.Utils; -import org.astraea.app.metrics.broker.BrokerTopicMetricsResult; import org.astraea.app.metrics.broker.LogMetrics; import org.astraea.app.metrics.broker.TotalTimeMs; import org.astraea.app.service.RequireBrokerCluster; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -73,35 +61,6 @@ void tearDown() throws Exception { mBeanClient.close(); } - @Test - void testAllEnumNameUnique() { - // arrange act - Set collectedName = - Arrays.stream(KafkaMetrics.BrokerTopic.values()) - .map(KafkaMetrics.BrokerTopic::metricName) - .collect(Collectors.toSet()); - - // assert - assertEquals(KafkaMetrics.BrokerTopic.values().length, collectedName.size()); - } - - @ParameterizedTest - @EnumSource(value = KafkaMetrics.BrokerTopic.class) - void testRequestBrokerTopicMetrics(KafkaMetrics.BrokerTopic metric) { - // act - BrokerTopicMetricsResult result = metric.fetch(mBeanClient); - - // assert access attribute will not throw casting error - // assert attribute actually exists - assertDoesNotThrow(result::count); - assertDoesNotThrow(result::eventType); - assertDoesNotThrow(result::fifteenMinuteRate); - assertDoesNotThrow(result::fiveMinuteRate); - assertDoesNotThrow(result::meanRate); - assertDoesNotThrow(result::oneMinuteRate); - assertDoesNotThrow(result::rateUnit); - } - @ParameterizedTest() @EnumSource(value = KafkaMetrics.Request.class) void testRequestTotalTimeMs(KafkaMetrics.Request request) { @@ -136,46 +95,4 @@ void testTopicPartitionMetrics(LogMetrics.Log request) { var beans = request.fetch(mBeanClient); assertNotEquals(0, beans.size()); } - - @Test - void testKafkaMetricsOf() { - assertEquals( - KafkaMetrics.BrokerTopic.BytesInPerSec, KafkaMetrics.BrokerTopic.of("ByTeSiNpErSeC")); - assertEquals( - KafkaMetrics.BrokerTopic.BytesOutPerSec, KafkaMetrics.BrokerTopic.of("bytesoutpersec")); - assertEquals( - KafkaMetrics.BrokerTopic.MessagesInPerSec, KafkaMetrics.BrokerTopic.of("MessagesInPERSEC")); - assertThrows(IllegalArgumentException.class, () -> KafkaMetrics.BrokerTopic.of("nothing")); - } - - @Test - @EnabledOnOs(LINUX) - void linuxDiskReadBytes() { - assertDoesNotThrow(() -> KafkaMetrics.BrokerTopic.linuxDiskReadBytes(mBeanClient)); - } - - @Test - @EnabledOnOs(LINUX) - void linuxDiskWriteBytes() { - assertDoesNotThrow(() -> KafkaMetrics.BrokerTopic.linuxDiskWriteBytes(mBeanClient)); - } - - @ParameterizedTest - @EnumSource(KafkaMetrics.BrokerTopic.class) - void testBrokerTopic(KafkaMetrics.BrokerTopic brokerTopic) { - var object = - new BrokerTopicMetricsResult( - new BeanObject("object", Map.of("name", brokerTopic.metricName()), Map.of())); - Assertions.assertEquals(1, brokerTopic.of(List.of(object)).size()); - - Assertions.assertEquals( - 0, - brokerTopic - .of( - List.of( - new BrokerTopicMetricsResult( - new BeanObject( - "object", Map.of("name", Utils.randomString(10)), Map.of())))) - .size()); - } } diff --git a/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java index d1362fb560..9d0d808fa4 100644 --- a/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/broker/ServerMetricsTest.java @@ -16,8 +16,19 @@ */ package org.astraea.app.metrics.broker; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.astraea.app.common.Utils; +import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.MBeanClient; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -28,4 +39,50 @@ public class ServerMetricsTest { void testPurgatorySize(ServerMetrics.DelayedOperationPurgatory request) { request.fetch(MBeanClient.local()).forEach(s -> Assertions.assertTrue(s.value() >= 0)); } + + @Test + void testKafkaMetricsOf() { + Arrays.stream(ServerMetrics.Topic.values()) + .forEach( + t -> + Assertions.assertEquals( + t, ServerMetrics.Topic.of(t.metricName().toLowerCase(Locale.ROOT)))); + Arrays.stream(ServerMetrics.Topic.values()) + .forEach( + t -> + Assertions.assertEquals( + t, ServerMetrics.Topic.of(t.metricName().toUpperCase(Locale.ROOT)))); + assertThrows(IllegalArgumentException.class, () -> ServerMetrics.Topic.of("nothing")); + } + + @ParameterizedTest + @EnumSource(ServerMetrics.Topic.class) + void testBrokerTopic(ServerMetrics.Topic brokerTopic) { + var object = + new ServerMetrics.Topic.Meter( + new BeanObject("object", Map.of("name", brokerTopic.metricName()), Map.of())); + Assertions.assertEquals(1, brokerTopic.of(List.of(object)).size()); + + Assertions.assertEquals( + 0, + brokerTopic + .of( + List.of( + new ServerMetrics.Topic.Meter( + new BeanObject( + "object", Map.of("name", Utils.randomString(10)), Map.of())))) + .size()); + } + + @Test + void testAllEnumNameUnique() { + // arrange act + Set collectedName = + Arrays.stream(ServerMetrics.Topic.values()) + .map(ServerMetrics.Topic::metricName) + .collect(Collectors.toSet()); + + // assert + Assertions.assertEquals(ServerMetrics.Topic.values().length, collectedName.size()); + } } diff --git a/app/src/test/java/org/astraea/app/metrics/broker/ServerTopicMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/broker/ServerTopicMetricsTest.java new file mode 100644 index 0000000000..e8c694428d --- /dev/null +++ b/app/src/test/java/org/astraea/app/metrics/broker/ServerTopicMetricsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.broker; + +import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.service.RequireSingleBrokerCluster; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class ServerTopicMetricsTest extends RequireSingleBrokerCluster { + + @ParameterizedTest + @EnumSource(value = ServerMetrics.Topic.class) + void testRequestBrokerTopicMetrics(ServerMetrics.Topic metric) { + // act + var result = metric.fetch(MBeanClient.local()); + + // assert access attribute will not throw casting error + // assert attribute actually exists + Assertions.assertDoesNotThrow(result::count); + Assertions.assertDoesNotThrow(result::eventType); + Assertions.assertDoesNotThrow(result::fifteenMinuteRate); + Assertions.assertDoesNotThrow(result::fiveMinuteRate); + Assertions.assertDoesNotThrow(result::meanRate); + Assertions.assertDoesNotThrow(result::oneMinuteRate); + Assertions.assertDoesNotThrow(result::rateUnit); + } +} diff --git a/app/src/test/java/org/astraea/app/metrics/platform/LinuxDiskMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/platform/LinuxDiskMetricsTest.java new file mode 100644 index 0000000000..652f68f894 --- /dev/null +++ b/app/src/test/java/org/astraea/app/metrics/platform/LinuxDiskMetricsTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.platform; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.condition.OS.LINUX; + +import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.service.RequireSingleBrokerCluster; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledOnOs; + +public class LinuxDiskMetricsTest extends RequireSingleBrokerCluster { + + @Test + @EnabledOnOs(LINUX) + void linuxDiskReadBytes() { + assertDoesNotThrow(() -> HostMetrics.linuxDiskReadBytes(MBeanClient.local())); + } + + @Test + @EnabledOnOs(LINUX) + void linuxDiskWriteBytes() { + assertDoesNotThrow(() -> HostMetrics.linuxDiskWriteBytes(MBeanClient.local())); + } +}