diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java index 97d87521d6..2afedac949 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricSensor.java @@ -18,7 +18,9 @@ import java.util.Collection; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.astraea.common.admin.ClusterBean; import org.astraea.common.metrics.HasBeanObject; import org.astraea.common.metrics.MBeanClient; @@ -33,13 +35,32 @@ public interface MetricSensor { */ static Optional of(Collection metricSensors) { if (metricSensors.isEmpty()) return Optional.empty(); + return of(metricSensors, ignore -> {}); + } + + /** + * merge all sensors and their exception handler into single one + * + * @param metricSensors cost function and exception handler + * @return sensor if there is available sensor. Otherwise, empty is returned + */ + static Optional of( + Collection metricSensors, Consumer exceptionHandler) { + if (metricSensors.isEmpty()) return Optional.empty(); return Optional.of( (client, clusterBean) -> metricSensors.stream() - .flatMap(f -> f.fetch(client, clusterBean).stream()) + .flatMap( + ms -> { + try { + return ms.fetch(client, clusterBean).stream(); + } catch (Exception ex) { + exceptionHandler.accept(ex); + return Stream.empty(); + } + }) .collect(Collectors.toUnmodifiableList())); } - /** * generate the metrics to stored by metrics collector. The implementation can use MBeanClient to * fetch metrics from remote/local mbean server. Or the implementation can generate custom metrics diff --git a/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java b/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java index 6608980ee4..12a926e073 100644 --- a/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java +++ b/common/src/test/java/org/astraea/common/metrics/collector/MetricSensorTest.java @@ -17,6 +17,7 @@ package org.astraea.common.metrics.collector; import java.util.List; +import java.util.NoSuchElementException; import java.util.Optional; import org.astraea.common.admin.ClusterBean; import org.astraea.common.metrics.HasBeanObject; @@ -61,4 +62,31 @@ void testNoSwallowException() { RuntimeException.class, () -> sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); } + + @Test + void testSensorsWithExceptionHandler() { + var mbean0 = Mockito.mock(HasBeanObject.class); + MetricSensor metricSensor0 = (client, ignored) -> List.of(mbean0); + MetricSensor metricSensor1 = + (client, ignored) -> { + throw new NoSuchElementException(); + }; + MetricSensor metricSensor2 = + (client, ignored) -> { + throw new RuntimeException(); + }; + + var sensor = MetricSensor.of(List.of(metricSensor0, metricSensor1)).get(); + Assertions.assertDoesNotThrow( + () -> sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); + Assertions.assertEquals( + 1, sensor.fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY).size()); + + Assertions.assertThrows( + RuntimeException.class, + () -> + MetricSensor.of(List.of(metricSensor0, metricSensor2)) + .get() + .fetch(Mockito.mock(MBeanClient.class), ClusterBean.EMPTY)); + } }