From 9920523b0b20f0badb7614c9edb818427b3adc67 Mon Sep 17 00:00:00 2001 From: Zhi-Mao Teng Date: Wed, 15 Mar 2023 17:13:09 +0800 Subject: [PATCH 1/3] Provide exception handler construction for MetricSensors. --- .../common/cost/NeutralIntegratedCost.java | 8 +++--- .../metrics/collector/MetricSensor.java | 27 +++++++++++++++++++ .../metrics/collector/MetricSensorTest.java | 27 +++++++++++++++++++ 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java b/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java index c23d71ee71..a414f17341 100644 --- a/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java +++ b/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.admin.ClusterBean; @@ -166,12 +167,11 @@ static Map weight( @Override public Optional metricSensor() { + Consumer e = (x) -> {}; return MetricSensor.of( metricsCost.stream() - .map(CostFunction::metricSensor) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toUnmodifiableList())); + .map(c -> Map.entry(c.metricSensor(), e)) + .collect(Collectors.toUnmodifiableMap(x -> x.getKey().get(), Map.Entry::getValue))); } @Override diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java index 97d87521d6..77f251b206 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java @@ -17,8 +17,12 @@ package org.astraea.common.metrics.collector; import java.util.Collection; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.astraea.common.admin.ClusterBean; import org.astraea.common.metrics.HasBeanObject; import org.astraea.common.metrics.MBeanClient; @@ -40,6 +44,29 @@ static Optional of(Collection metricSensors) { .collect(Collectors.toUnmodifiableList())); } + /** + * merge all sensors and their exception handler into single one + * + * @param metricSensors cost function and exception handler + * @return sensor if there is available sensor. Otherwise, empty is returned + */ + static Optional of(Map> metricSensors) { + if (metricSensors.isEmpty()) return Optional.empty(); + return Optional.of( + (client, clusterBean) -> + metricSensors.entrySet().stream() + .flatMap( + e -> { + var sensor = e.getKey(); + try { + return sensor.fetch(client, clusterBean).stream(); + } catch (NoSuchElementException ex) { + e.getValue().accept(ex); + return Stream.empty(); + } + }) + .collect(Collectors.toUnmodifiableList())); + } /** * generate the metrics to stored by metrics collector. The implementation can use MBeanClient to * fetch metrics from remote/local mbean server. Or the implementation can generate custom metrics diff --git a/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java b/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java index 6608980ee4..422a5f3b67 100644 --- a/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java +++ b/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java @@ -17,6 +17,8 @@ package org.astraea.common.metrics.collector; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import org.astraea.common.admin.ClusterBean; import org.astraea.common.metrics.HasBeanObject; @@ -61,4 +63,29 @@ void testNoSwallowException() { RuntimeException.class, () -> sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); } + + @Test + void testSensorsWithExceptionHandler() { + var mbean0 = Mockito.mock(HasBeanObject.class); + MetricSensor metricSensor0 = (client, ignored) -> List.of(mbean0); + MetricSensor metricSensor1 = + (client, ignored) -> { + throw new NoSuchElementException(); + }; + MetricSensor metricSensor2 = + (client, ignored) -> { + throw new RuntimeException(); + }; + + var sensor = MetricSensor.of(Map.of(metricSensor0, e -> {}, metricSensor1, e -> {})).get(); + Assertions.assertDoesNotThrow( + () -> sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); + + Assertions.assertThrows( + RuntimeException.class, + () -> + MetricSensor.of(Map.of(metricSensor0, e -> {}, metricSensor2, e -> {})) + .get() + .fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); + } } From bcf89af7a0d65dac2a6511bc0ce0d2c06b150d08 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Thu, 16 Mar 2023 00:57:15 +0800 Subject: [PATCH 2/3] Modify the parameters and test --- .../common/cost/NeutralIntegratedCost.java | 8 ++++---- .../metrics/collector/MetricSensor.java | 19 +++++++------------ .../metrics/collector/MetricSensorTest.java | 7 ++++--- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java b/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java index a414f17341..c23d71ee71 100644 --- a/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java +++ b/common/src/main/java/org/astraea/common/cost/NeutralIntegratedCost.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.admin.ClusterBean; @@ -167,11 +166,12 @@ static Map weight( @Override public Optional metricSensor() { - Consumer e = (x) -> {}; return MetricSensor.of( metricsCost.stream() - .map(c -> Map.entry(c.metricSensor(), e)) - .collect(Collectors.toUnmodifiableMap(x -> x.getKey().get(), Map.Entry::getValue))); + .map(CostFunction::metricSensor) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toUnmodifiableList())); } @Override diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java index 77f251b206..ecf60ec044 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java @@ -17,7 +17,6 @@ package org.astraea.common.metrics.collector; import java.util.Collection; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.function.Consumer; @@ -37,11 +36,7 @@ public interface MetricSensor { */ static Optional of(Collection metricSensors) { if (metricSensors.isEmpty()) return Optional.empty(); - return Optional.of( - (client, clusterBean) -> - metricSensors.stream() - .flatMap(f -> f.fetch(client, clusterBean).stream()) - .collect(Collectors.toUnmodifiableList())); + return of(metricSensors, (ex) -> {}); } /** @@ -50,18 +45,18 @@ static Optional of(Collection metricSensors) { * @param metricSensors cost function and exception handler * @return sensor if there is available sensor. Otherwise, empty is returned */ - static Optional of(Map> metricSensors) { + static Optional of( + Collection metricSensors, Consumer exceptionHandler) { if (metricSensors.isEmpty()) return Optional.empty(); return Optional.of( (client, clusterBean) -> - metricSensors.entrySet().stream() + metricSensors.stream() .flatMap( - e -> { - var sensor = e.getKey(); + ms -> { try { - return sensor.fetch(client, clusterBean).stream(); + return ms.fetch(client, clusterBean).stream(); } catch (NoSuchElementException ex) { - e.getValue().accept(ex); + exceptionHandler.accept(ex); return Stream.empty(); } }) diff --git a/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java b/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java index 422a5f3b67..12a926e073 100644 --- a/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java +++ b/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java @@ -17,7 +17,6 @@ package org.astraea.common.metrics.collector; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import org.astraea.common.admin.ClusterBean; @@ -77,14 +76,16 @@ void testSensorsWithExceptionHandler() { throw new RuntimeException(); }; - var sensor = MetricSensor.of(Map.of(metricSensor0, e -> {}, metricSensor1, e -> {})).get(); + var sensor = MetricSensor.of(List.of(metricSensor0, metricSensor1)).get(); Assertions.assertDoesNotThrow( () -> sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); + Assertions.assertEquals( + 1, sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY).size()); Assertions.assertThrows( RuntimeException.class, () -> - MetricSensor.of(Map.of(metricSensor0, e -> {}, metricSensor2, e -> {})) + MetricSensor.of(List.of(metricSensor0, metricSensor2)) .get() .fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); } From e506d755aea339c4c4e440f3905cf7a4c3e03ee3 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Sat, 18 Mar 2023 22:26:01 +0800 Subject: [PATCH 3/3] Fix style and change exception type --- .../org/astraea/common/metrics/collector/MetricSensor.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java index ecf60ec044..2afedac949 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java @@ -17,7 +17,6 @@ package org.astraea.common.metrics.collector; import java.util.Collection; -import java.util.NoSuchElementException; import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -36,7 +35,7 @@ public interface MetricSensor { */ static Optional of(Collection metricSensors) { if (metricSensors.isEmpty()) return Optional.empty(); - return of(metricSensors, (ex) -> {}); + return of(metricSensors, ignore -> {}); } /** @@ -55,7 +54,7 @@ static Optional of( ms -> { try { return ms.fetch(client, clusterBean).stream(); - } catch (NoSuchElementException ex) { + } catch (Exception ex) { exceptionHandler.accept(ex); return Stream.empty(); }