From 3140fe88fac562425d383971c360f9d78e92b2f3 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Sat, 6 Aug 2022 17:45:33 +0800 Subject: [PATCH 1/3] Add Consumer Node Metrics --- .../org/astraea/app/cost/NodeLatencyCost.java | 15 ++-- .../astraea/app/cost/NodeThroughputCost.java | 13 ++- .../HasNodeMetrics.java} | 8 +- .../client/consumer/ConsumerMetrics.java | 73 ++++++++++++++++ .../producer/HasProducerTopicMetrics.java | 2 +- .../producer/ProducerMetrics.java | 11 +-- .../astraea/app/cost/NodeLatencyCostTest.java | 8 +- .../app/cost/NodeThroughputCostTest.java | 10 +-- .../client/consumer/ConsumerMetricsTest.java | 83 +++++++++++++++++++ .../producer/HasProducerTopicMetricsTest.java | 2 +- .../producer/ProducerMetricsTest.java} | 11 +-- 11 files changed, 196 insertions(+), 40 deletions(-) rename app/src/main/java/org/astraea/app/metrics/{producer/HasProducerNodeMetrics.java => client/HasNodeMetrics.java} (91%) create mode 100644 app/src/main/java/org/astraea/app/metrics/client/consumer/ConsumerMetrics.java rename app/src/main/java/org/astraea/app/metrics/{ => client}/producer/HasProducerTopicMetrics.java (97%) rename app/src/main/java/org/astraea/app/metrics/{ => client}/producer/ProducerMetrics.java (87%) create mode 100644 app/src/test/java/org/astraea/app/metrics/client/consumer/ConsumerMetricsTest.java rename app/src/test/java/org/astraea/app/metrics/{ => client}/producer/HasProducerTopicMetricsTest.java (97%) rename app/src/test/java/org/astraea/app/metrics/{producer/HasProducerNodeMetricsTest.java => client/producer/ProducerMetricsTest.java} (92%) diff --git a/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java b/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java index 976e007117..b0e36ec150 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java @@ -23,9 +23,9 @@ import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.metrics.client.HasNodeMetrics; +import org.astraea.app.metrics.client.producer.ProducerMetrics; import org.astraea.app.metrics.collector.Fetcher; -import org.astraea.app.metrics.producer.HasProducerNodeMetrics; -import org.astraea.app.metrics.producer.ProducerMetrics; public class NodeLatencyCost implements HasBrokerCost { @@ -34,10 +34,10 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var result = clusterBean.all().values().stream() .flatMap(Collection::stream) - .filter(b -> b instanceof HasProducerNodeMetrics) - .map(b -> (HasProducerNodeMetrics) b) + .filter(b -> b instanceof HasNodeMetrics) + .map(b -> (HasNodeMetrics) b) .filter(b -> !Double.isNaN(b.requestLatencyAvg())) - .collect(Collectors.groupingBy(HasProducerNodeMetrics::brokerId)) + .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) .entrySet() .stream() .collect( @@ -46,10 +46,9 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { e -> e.getValue().stream() .sorted( - Comparator.comparing(HasProducerNodeMetrics::createdTimestamp) - .reversed()) + Comparator.comparing(HasNodeMetrics::createdTimestamp).reversed()) .limit(1) - .mapToDouble(HasProducerNodeMetrics::requestLatencyAvg) + .mapToDouble(HasNodeMetrics::requestLatencyAvg) .sum())); return () -> result; } diff --git a/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java b/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java index 6561d33f7a..c6ad88c159 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java @@ -23,9 +23,9 @@ import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; +import org.astraea.app.metrics.client.HasNodeMetrics; +import org.astraea.app.metrics.client.producer.ProducerMetrics; import org.astraea.app.metrics.collector.Fetcher; -import org.astraea.app.metrics.producer.HasProducerNodeMetrics; -import org.astraea.app.metrics.producer.ProducerMetrics; public class NodeThroughputCost implements HasBrokerCost { @@ -34,10 +34,10 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var result = clusterBean.all().values().stream() .flatMap(Collection::stream) - .filter(b -> b instanceof HasProducerNodeMetrics) - .map(b -> (HasProducerNodeMetrics) b) + .filter(b -> b instanceof HasNodeMetrics) + .map(b -> (HasNodeMetrics) b) .filter(b -> !Double.isNaN(b.incomingByteRate()) && !Double.isNaN(b.outgoingByteRate())) - .collect(Collectors.groupingBy(HasProducerNodeMetrics::brokerId)) + .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) .entrySet() .stream() .collect( @@ -46,8 +46,7 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { e -> e.getValue().stream() .sorted( - Comparator.comparing(HasProducerNodeMetrics::createdTimestamp) - .reversed()) + Comparator.comparing(HasNodeMetrics::createdTimestamp).reversed()) .limit(1) .mapToDouble(m -> m.incomingByteRate() + m.outgoingByteRate()) .sum())); diff --git a/app/src/main/java/org/astraea/app/metrics/producer/HasProducerNodeMetrics.java b/app/src/main/java/org/astraea/app/metrics/client/HasNodeMetrics.java similarity index 91% rename from app/src/main/java/org/astraea/app/metrics/producer/HasProducerNodeMetrics.java rename to app/src/main/java/org/astraea/app/metrics/client/HasNodeMetrics.java index 43945327ad..039a81ab1e 100644 --- a/app/src/main/java/org/astraea/app/metrics/producer/HasProducerNodeMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/client/HasNodeMetrics.java @@ -14,15 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.metrics.producer; +package org.astraea.app.metrics.client; import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.HasBeanObject; -public interface HasProducerNodeMetrics extends HasBeanObject { +public interface HasNodeMetrics extends HasBeanObject { - static HasProducerNodeMetrics of(BeanObject beanObject, int brokerId) { - return new HasProducerNodeMetrics() { + static HasNodeMetrics of(BeanObject beanObject, int brokerId) { + return new HasNodeMetrics() { @Override public int brokerId() { return brokerId; diff --git a/app/src/main/java/org/astraea/app/metrics/client/consumer/ConsumerMetrics.java b/app/src/main/java/org/astraea/app/metrics/client/consumer/ConsumerMetrics.java new file mode 100644 index 0000000000..ee6c7c3862 --- /dev/null +++ b/app/src/main/java/org/astraea/app/metrics/client/consumer/ConsumerMetrics.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.client.consumer; + +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; +import org.astraea.app.metrics.BeanQuery; +import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.client.HasNodeMetrics; + +public class ConsumerMetrics { + private static int brokerId(String node) { + return Integer.parseInt(node.substring(node.indexOf("-") + 1)); + } + + /** + * node metrics traced by consumer + * + * @param mBeanClient to query beans + * @param brokerId broker ids + * @return key is client id used by consumer, and value is node metrics traced by each consumer + */ + public static Map node(MBeanClient mBeanClient, int brokerId) { + return mBeanClient + .queryBeans( + BeanQuery.builder() + .domainName("kafka.consumer") + .property("type", "consumer-node-metrics") + .property("node-id", "node-" + brokerId) + .property("client-id", "*") + .build()) + .stream() + .collect( + Collectors.toUnmodifiableMap( + b -> b.properties().get("client-id"), + b -> HasNodeMetrics.of(b, brokerId(b.properties().get("node-id"))))); + } + + /** + * collect HasNodeMetrics from all consumers. + * + * @param mBeanClient to query metrics + * @return key is broker id, and value is associated to broker metrics recorded by all consumers + */ + public static Collection nodes(MBeanClient mBeanClient) { + return mBeanClient + .queryBeans( + BeanQuery.builder() + .domainName("kafka.consumer") + .property("type", "consumer-node-metrics") + .property("node-id", "*") + .property("client-id", "*") + .build()) + .stream() + .map(b -> HasNodeMetrics.of(b, brokerId(b.properties().get("node-id")))) + .collect(Collectors.toUnmodifiableList()); + } +} diff --git a/app/src/main/java/org/astraea/app/metrics/producer/HasProducerTopicMetrics.java b/app/src/main/java/org/astraea/app/metrics/client/producer/HasProducerTopicMetrics.java similarity index 97% rename from app/src/main/java/org/astraea/app/metrics/producer/HasProducerTopicMetrics.java rename to app/src/main/java/org/astraea/app/metrics/client/producer/HasProducerTopicMetrics.java index 5b5358711d..47a3b0d879 100644 --- a/app/src/main/java/org/astraea/app/metrics/producer/HasProducerTopicMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/client/producer/HasProducerTopicMetrics.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.metrics.producer; +package org.astraea.app.metrics.client.producer; import org.astraea.app.metrics.HasBeanObject; diff --git a/app/src/main/java/org/astraea/app/metrics/producer/ProducerMetrics.java b/app/src/main/java/org/astraea/app/metrics/client/producer/ProducerMetrics.java similarity index 87% rename from app/src/main/java/org/astraea/app/metrics/producer/ProducerMetrics.java rename to app/src/main/java/org/astraea/app/metrics/client/producer/ProducerMetrics.java index c8ada5a61b..52a34627c4 100644 --- a/app/src/main/java/org/astraea/app/metrics/producer/ProducerMetrics.java +++ b/app/src/main/java/org/astraea/app/metrics/client/producer/ProducerMetrics.java @@ -14,13 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.metrics.producer; +package org.astraea.app.metrics.client.producer; import java.util.Collection; import java.util.Map; import java.util.stream.Collectors; import org.astraea.app.metrics.BeanQuery; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.client.HasNodeMetrics; public final class ProducerMetrics { @@ -35,7 +36,7 @@ private static int brokerId(String node) { * @param brokerId broker ids * @return key is client id used by producer, and value is node metrics traced by each producer */ - public static Map node(MBeanClient mBeanClient, int brokerId) { + public static Map node(MBeanClient mBeanClient, int brokerId) { return mBeanClient .queryBeans( BeanQuery.builder() @@ -48,7 +49,7 @@ public static Map node(MBeanClient mBeanClient, .collect( Collectors.toUnmodifiableMap( b -> b.properties().get("client-id"), - b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id"))))); + b -> HasNodeMetrics.of(b, brokerId(b.properties().get("node-id"))))); } /** @@ -57,7 +58,7 @@ public static Map node(MBeanClient mBeanClient, * @param mBeanClient to query metrics * @return key is broker id, and value is associated to broker metrics recorded by all producers */ - public static Collection nodes(MBeanClient mBeanClient) { + public static Collection nodes(MBeanClient mBeanClient) { return mBeanClient .queryBeans( BeanQuery.builder() @@ -67,7 +68,7 @@ public static Collection nodes(MBeanClient mBeanClient) .property("client-id", "*") .build()) .stream() - .map(b -> HasProducerNodeMetrics.of(b, brokerId(b.properties().get("node-id")))) + .map(b -> HasNodeMetrics.of(b, brokerId(b.properties().get("node-id")))) .collect(Collectors.toUnmodifiableList()); } diff --git a/app/src/test/java/org/astraea/app/cost/NodeLatencyCostTest.java b/app/src/test/java/org/astraea/app/cost/NodeLatencyCostTest.java index bbb4ef899e..5473f47d7e 100644 --- a/app/src/test/java/org/astraea/app/cost/NodeLatencyCostTest.java +++ b/app/src/test/java/org/astraea/app/cost/NodeLatencyCostTest.java @@ -27,8 +27,8 @@ import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.HasBeanObject; import org.astraea.app.metrics.MBeanClient; -import org.astraea.app.metrics.producer.HasProducerNodeMetrics; -import org.astraea.app.metrics.producer.ProducerMetrics; +import org.astraea.app.metrics.client.HasNodeMetrics; +import org.astraea.app.metrics.client.producer.ProducerMetrics; import org.astraea.app.producer.Producer; import org.astraea.app.service.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; @@ -39,7 +39,7 @@ public class NodeLatencyCostTest extends RequireBrokerCluster { @Test void testNan() { - var bean = Mockito.mock(HasProducerNodeMetrics.class); + var bean = Mockito.mock(HasNodeMetrics.class); Mockito.when(bean.brokerId()).thenReturn(1); Mockito.when(bean.requestLatencyAvg()).thenReturn(Double.NaN); var clusterBean = ClusterBean.of(Map.of(-1, List.of(bean))); @@ -50,7 +50,7 @@ void testNan() { @Test void testBrokerId() { - var bean = Mockito.mock(HasProducerNodeMetrics.class); + var bean = Mockito.mock(HasNodeMetrics.class); Mockito.when(bean.brokerId()).thenReturn(1); Mockito.when(bean.requestLatencyAvg()).thenReturn(10D); var clusterBean = ClusterBean.of(Map.of(-1, List.of(bean))); diff --git a/app/src/test/java/org/astraea/app/cost/NodeThroughputCostTest.java b/app/src/test/java/org/astraea/app/cost/NodeThroughputCostTest.java index b7dde18d99..22bdc46f25 100644 --- a/app/src/test/java/org/astraea/app/cost/NodeThroughputCostTest.java +++ b/app/src/test/java/org/astraea/app/cost/NodeThroughputCostTest.java @@ -22,7 +22,7 @@ import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.BeanObject; import org.astraea.app.metrics.MBeanClient; -import org.astraea.app.metrics.producer.HasProducerNodeMetrics; +import org.astraea.app.metrics.client.HasNodeMetrics; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -31,7 +31,7 @@ public class NodeThroughputCostTest { @Test void testNan() { - var bean = Mockito.mock(HasProducerNodeMetrics.class); + var bean = Mockito.mock(HasNodeMetrics.class); Mockito.when(bean.brokerId()).thenReturn(1); Mockito.when(bean.incomingByteRate()).thenReturn(Double.NaN); Mockito.when(bean.outgoingByteRate()).thenReturn(Double.NaN); @@ -43,7 +43,7 @@ void testNan() { @Test void testBrokerId() { - var bean = Mockito.mock(HasProducerNodeMetrics.class); + var bean = Mockito.mock(HasNodeMetrics.class); Mockito.when(bean.brokerId()).thenReturn(1); Mockito.when(bean.incomingByteRate()).thenReturn(10D); Mockito.when(bean.outgoingByteRate()).thenReturn(10D); @@ -57,11 +57,11 @@ void testBrokerId() { @Test void testCost() { var throughputCost = new NodeThroughputCost(); - var bean0 = Mockito.mock(HasProducerNodeMetrics.class); + var bean0 = Mockito.mock(HasNodeMetrics.class); Mockito.when(bean0.incomingByteRate()).thenReturn(10D); Mockito.when(bean0.outgoingByteRate()).thenReturn(20D); Mockito.when(bean0.brokerId()).thenReturn(10); - var bean1 = Mockito.mock(HasProducerNodeMetrics.class); + var bean1 = Mockito.mock(HasNodeMetrics.class); Mockito.when(bean1.incomingByteRate()).thenReturn(2D); Mockito.when(bean1.outgoingByteRate()).thenReturn(3D); Mockito.when(bean1.brokerId()).thenReturn(11); diff --git a/app/src/test/java/org/astraea/app/metrics/client/consumer/ConsumerMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/client/consumer/ConsumerMetricsTest.java new file mode 100644 index 0000000000..e2936c3679 --- /dev/null +++ b/app/src/test/java/org/astraea/app/metrics/client/consumer/ConsumerMetricsTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.metrics.client.consumer; + +import java.time.Duration; +import java.util.Set; +import java.util.stream.Collectors; +import org.astraea.app.admin.Admin; +import org.astraea.app.admin.TopicPartition; +import org.astraea.app.common.Utils; +import org.astraea.app.consumer.Consumer; +import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.client.HasNodeMetrics; +import org.astraea.app.service.RequireBrokerCluster; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ConsumerMetricsTest extends RequireBrokerCluster { + @Test + void testSingleBroker() { + var topic = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers()); + var consumer = + Consumer.forTopics(Set.of(topic)).bootstrapServers(bootstrapServers()).build()) { + admin.creator().topic(topic).numberOfPartitions(1).create(); + Utils.sleep(Duration.ofSeconds(3)); + var owner = admin.replicas(Set.of(topic)).get(TopicPartition.of(topic, 0)).get(0).broker(); + consumer.poll(Duration.ofSeconds(5)); + var metrics = ConsumerMetrics.node(MBeanClient.local(), owner); + Assertions.assertEquals(1, metrics.size()); + check(metrics.values().stream().findAny().get()); + } + } + + @Test + void testMultiBrokers() { + var topic = Utils.randomString(10); + try (var admin = Admin.of(bootstrapServers()); + var consumer = + Consumer.forTopics(Set.of(topic)).bootstrapServers(bootstrapServers()).build()) { + admin.creator().topic(topic).numberOfPartitions(3).create(); + Utils.sleep(Duration.ofSeconds(3)); + consumer.poll(Duration.ofSeconds(5)); + var metrics = ConsumerMetrics.nodes(MBeanClient.local()); + Assertions.assertNotEquals(1, metrics.size()); + Assertions.assertTrue( + metrics.stream() + .map(HasNodeMetrics::brokerId) + .collect(Collectors.toUnmodifiableList()) + .containsAll(brokerIds())); + metrics.forEach(ConsumerMetricsTest::check); + } + } + + private static void check(HasNodeMetrics metrics) { + Assertions.assertNotEquals(0D, metrics.incomingByteRate()); + Assertions.assertNotEquals(0D, metrics.incomingByteTotal()); + Assertions.assertNotEquals(0D, metrics.outgoingByteRate()); + Assertions.assertNotEquals(0D, metrics.outgoingByteTotal()); + Assertions.assertEquals(Double.NaN, metrics.requestLatencyAvg()); + Assertions.assertEquals(Double.NaN, metrics.requestLatencyMax()); + Assertions.assertNotEquals(0D, metrics.requestRate()); + Assertions.assertNotEquals(0D, metrics.requestSizeAvg()); + Assertions.assertNotEquals(0D, metrics.requestSizeMax()); + Assertions.assertNotEquals(0D, metrics.requestTotal()); + Assertions.assertNotEquals(0D, metrics.responseRate()); + Assertions.assertNotEquals(0D, metrics.responseTotal()); + } +} diff --git a/app/src/test/java/org/astraea/app/metrics/producer/HasProducerTopicMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/client/producer/HasProducerTopicMetricsTest.java similarity index 97% rename from app/src/test/java/org/astraea/app/metrics/producer/HasProducerTopicMetricsTest.java rename to app/src/test/java/org/astraea/app/metrics/client/producer/HasProducerTopicMetricsTest.java index 8e2c905bc1..ac14a2ed31 100644 --- a/app/src/test/java/org/astraea/app/metrics/producer/HasProducerTopicMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/client/producer/HasProducerTopicMetricsTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.metrics.producer; +package org.astraea.app.metrics.client.producer; import java.util.concurrent.ExecutionException; import org.astraea.app.common.Utils; diff --git a/app/src/test/java/org/astraea/app/metrics/producer/HasProducerNodeMetricsTest.java b/app/src/test/java/org/astraea/app/metrics/client/producer/ProducerMetricsTest.java similarity index 92% rename from app/src/test/java/org/astraea/app/metrics/producer/HasProducerNodeMetricsTest.java rename to app/src/test/java/org/astraea/app/metrics/client/producer/ProducerMetricsTest.java index 9869b3df14..17cbca272a 100644 --- a/app/src/test/java/org/astraea/app/metrics/producer/HasProducerNodeMetricsTest.java +++ b/app/src/test/java/org/astraea/app/metrics/client/producer/ProducerMetricsTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.metrics.producer; +package org.astraea.app.metrics.client.producer; import java.time.Duration; import java.util.Set; @@ -24,12 +24,13 @@ import org.astraea.app.admin.TopicPartition; import org.astraea.app.common.Utils; import org.astraea.app.metrics.MBeanClient; +import org.astraea.app.metrics.client.HasNodeMetrics; import org.astraea.app.producer.Producer; import org.astraea.app.service.RequireBrokerCluster; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class HasProducerNodeMetricsTest extends RequireBrokerCluster { +public class ProducerMetricsTest extends RequireBrokerCluster { @Test void testSingleBroker() throws ExecutionException, InterruptedException { @@ -81,14 +82,14 @@ void testMultiBrokers() throws ExecutionException, InterruptedException { Assertions.assertNotEquals(1, metrics.size()); Assertions.assertTrue( metrics.stream() - .map(HasProducerNodeMetrics::brokerId) + .map(HasNodeMetrics::brokerId) .collect(Collectors.toUnmodifiableList()) .containsAll(brokerIds())); - metrics.forEach(HasProducerNodeMetricsTest::check); + metrics.forEach(ProducerMetricsTest::check); } } - private static void check(HasProducerNodeMetrics metrics) { + private static void check(HasNodeMetrics metrics) { Assertions.assertNotEquals(0D, metrics.incomingByteRate()); Assertions.assertNotEquals(0D, metrics.incomingByteTotal()); Assertions.assertNotEquals(0D, metrics.outgoingByteRate()); From 2e6041493fb49943aa4def52c1942ba0bf401349 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Sun, 7 Aug 2022 15:47:20 +0800 Subject: [PATCH 2/3] Integrate NodeLatencyCost and NodeThroughputCost --- .../org/astraea/app/cost/NodeLatencyCost.java | 16 +------- .../org/astraea/app/cost/NodeMetricsCost.java | 41 +++++++++++++++++++ .../astraea/app/cost/NodeThroughputCost.java | 16 +------- 3 files changed, 45 insertions(+), 28 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java diff --git a/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java b/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java index b0e36ec150..f8688b79cb 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java @@ -16,26 +16,19 @@ */ package org.astraea.app.cost; -import java.util.Collection; import java.util.Comparator; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.client.HasNodeMetrics; -import org.astraea.app.metrics.client.producer.ProducerMetrics; -import org.astraea.app.metrics.collector.Fetcher; -public class NodeLatencyCost implements HasBrokerCost { +public class NodeLatencyCost extends NodeMetricsCost { @Override public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var result = - clusterBean.all().values().stream() - .flatMap(Collection::stream) - .filter(b -> b instanceof HasNodeMetrics) - .map(b -> (HasNodeMetrics) b) + toHasNodeMetrics(clusterBean) .filter(b -> !Double.isNaN(b.requestLatencyAvg())) .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) .entrySet() @@ -52,9 +45,4 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { .sum())); return () -> result; } - - @Override - public Optional fetcher() { - return Optional.of(ProducerMetrics::nodes); - } } diff --git a/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java b/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java new file mode 100644 index 0000000000..1571b99feb --- /dev/null +++ b/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.cost; + +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; +import org.astraea.app.admin.ClusterBean; +import org.astraea.app.metrics.client.HasNodeMetrics; +import org.astraea.app.metrics.client.producer.ProducerMetrics; +import org.astraea.app.metrics.collector.Fetcher; + +/** Calculate the cost by client-node-metrics. */ +public abstract class NodeMetricsCost implements HasBrokerCost { + /** Group clusterBean by broker id. */ + public static Stream toHasNodeMetrics(ClusterBean clusterBean) { + return clusterBean.all().values().stream() + .flatMap(Collection::stream) + .filter(b -> b instanceof HasNodeMetrics) + .map(b -> (HasNodeMetrics) b); + } + + @Override + public Optional fetcher() { + return Optional.of(ProducerMetrics::nodes); + } +} diff --git a/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java b/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java index c6ad88c159..f2a9009f08 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java @@ -16,26 +16,19 @@ */ package org.astraea.app.cost; -import java.util.Collection; import java.util.Comparator; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.client.HasNodeMetrics; -import org.astraea.app.metrics.client.producer.ProducerMetrics; -import org.astraea.app.metrics.collector.Fetcher; -public class NodeThroughputCost implements HasBrokerCost { +public class NodeThroughputCost extends NodeMetricsCost { @Override public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { var result = - clusterBean.all().values().stream() - .flatMap(Collection::stream) - .filter(b -> b instanceof HasNodeMetrics) - .map(b -> (HasNodeMetrics) b) + toHasNodeMetrics(clusterBean) .filter(b -> !Double.isNaN(b.incomingByteRate()) && !Double.isNaN(b.outgoingByteRate())) .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) .entrySet() @@ -52,9 +45,4 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { .sum())); return () -> result; } - - @Override - public Optional fetcher() { - return Optional.of(ProducerMetrics::nodes); - } } From d7ce68db9d704edc00fa24716e65c38053f17c5d Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Sun, 7 Aug 2022 20:49:55 +0800 Subject: [PATCH 3/3] Reconstruct --- .../org/astraea/app/cost/NodeLatencyCost.java | 26 ++------------ .../org/astraea/app/cost/NodeMetricsCost.java | 36 +++++++++++++++---- .../astraea/app/cost/NodeThroughputCost.java | 26 ++------------ 3 files changed, 33 insertions(+), 55 deletions(-) diff --git a/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java b/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java index f8688b79cb..16f10229e3 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeLatencyCost.java @@ -16,33 +16,11 @@ */ package org.astraea.app.cost; -import java.util.Comparator; -import java.util.Map; -import java.util.stream.Collectors; -import org.astraea.app.admin.ClusterBean; -import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.client.HasNodeMetrics; public class NodeLatencyCost extends NodeMetricsCost { - @Override - public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { - var result = - toHasNodeMetrics(clusterBean) - .filter(b -> !Double.isNaN(b.requestLatencyAvg())) - .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) - .entrySet() - .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - e.getValue().stream() - .sorted( - Comparator.comparing(HasNodeMetrics::createdTimestamp).reversed()) - .limit(1) - .mapToDouble(HasNodeMetrics::requestLatencyAvg) - .sum())); - return () -> result; + protected double value(HasNodeMetrics hasNodeMetrics) { + return hasNodeMetrics.requestLatencyAvg(); } } diff --git a/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java b/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java index 1571b99feb..1baa53c1b3 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeMetricsCost.java @@ -17,23 +17,45 @@ package org.astraea.app.cost; import java.util.Collection; +import java.util.Comparator; +import java.util.Map; import java.util.Optional; -import java.util.stream.Stream; +import java.util.stream.Collectors; import org.astraea.app.admin.ClusterBean; +import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.client.HasNodeMetrics; import org.astraea.app.metrics.client.producer.ProducerMetrics; import org.astraea.app.metrics.collector.Fetcher; /** Calculate the cost by client-node-metrics. */ public abstract class NodeMetricsCost implements HasBrokerCost { - /** Group clusterBean by broker id. */ - public static Stream toHasNodeMetrics(ClusterBean clusterBean) { - return clusterBean.all().values().stream() - .flatMap(Collection::stream) - .filter(b -> b instanceof HasNodeMetrics) - .map(b -> (HasNodeMetrics) b); + @Override + public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { + var result = + clusterBean.all().values().stream() + .flatMap(Collection::stream) + .filter(b -> b instanceof HasNodeMetrics) + .map(b -> (HasNodeMetrics) b) + .filter(b -> !Double.isNaN(value(b))) + .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> + e.getValue().stream() + .sorted( + Comparator.comparing(HasNodeMetrics::createdTimestamp).reversed()) + .limit(1) + .mapToDouble(this::value) + .sum())); + return () -> result; } + /** The metrics to take into consider. */ + protected abstract double value(HasNodeMetrics hasNodeMetrics); + @Override public Optional fetcher() { return Optional.of(ProducerMetrics::nodes); diff --git a/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java b/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java index f2a9009f08..9e97d82125 100644 --- a/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java +++ b/app/src/main/java/org/astraea/app/cost/NodeThroughputCost.java @@ -16,33 +16,11 @@ */ package org.astraea.app.cost; -import java.util.Comparator; -import java.util.Map; -import java.util.stream.Collectors; -import org.astraea.app.admin.ClusterBean; -import org.astraea.app.admin.ClusterInfo; import org.astraea.app.metrics.client.HasNodeMetrics; public class NodeThroughputCost extends NodeMetricsCost { - @Override - public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { - var result = - toHasNodeMetrics(clusterBean) - .filter(b -> !Double.isNaN(b.incomingByteRate()) && !Double.isNaN(b.outgoingByteRate())) - .collect(Collectors.groupingBy(HasNodeMetrics::brokerId)) - .entrySet() - .stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> - e.getValue().stream() - .sorted( - Comparator.comparing(HasNodeMetrics::createdTimestamp).reversed()) - .limit(1) - .mapToDouble(m -> m.incomingByteRate() + m.outgoingByteRate()) - .sum())); - return () -> result; + protected double value(HasNodeMetrics hasNodeMetrics) { + return hasNodeMetrics.incomingByteRate() + hasNodeMetrics.outgoingByteRate(); } }