From b5ce3b003ad271aeff6f68905dc2ed9b38b187e7 Mon Sep 17 00:00:00 2001 From: amathur1893 Date: Fri, 23 Oct 2020 03:46:37 +0530 Subject: [PATCH] Publish shard state metrics (#212) * Publish shard state metrics * Publish Shard State Metrics2 * Publish Shard State Metrics * Publish Shard State Metrics * Publish Shard State Metrics * Publish Shard State Metrics * Publish Shard State Metrics Co-authored-by: Arpita --- .../PerformanceAnalyzerPlugin.java | 3 + .../collectors/ShardStateCollector.java | 164 ++++++++++++++++++ .../config/PerformanceAnalyzerController.java | 11 ++ .../performanceanalyzer/util/Utils.java | 3 +- .../collectors/ShardStateCollectorTest.java | 57 ++++++ 5 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollector.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollectorTest.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index ea218014..49b16a87 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -15,6 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; @@ -198,6 +199,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NetworkInterfaceCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance()); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ShardStateCollector( + performanceAnalyzerController,configOverridesWrapper)); scheduledMetricCollectorsExecutor.start(); EventLog eventLog = new EventLog(); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollector.java new file mode 100644 index 00000000..1d35cae1 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollector.java @@ -0,0 +1,164 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.jooq.tools.StringUtils; +import org.jooq.tools.json.JSONObject; + +import java.util.List; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardType.SHARD_PRIMARY; +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardType.SHARD_REPLICA; + +public class ShardStateCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor { + public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(ShardStateCollector.class).samplingInterval; + private static final Logger LOG = LogManager.getLogger(ShardStateCollector.class); + private static final int KEYS_PATH_LENGTH = 0; + private final ConfigOverridesWrapper configOverridesWrapper; + private final PerformanceAnalyzerController controller; + private StringBuilder value; + + public ShardStateCollector(PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super(SAMPLING_TIME_INTERVAL, "ShardsStateCollector"); + value = new StringBuilder(); + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + } + + @Override + void collectMetrics( long startTime) { + if(!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) { + return; + } + long mCurrT = System.currentTimeMillis(); + if (ESResources.INSTANCE.getClusterService() == null) { + return; + } + ClusterState clusterState = ESResources.INSTANCE.getClusterService().state(); + boolean inActiveShard = false; + try { + value.setLength(0); + value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + RoutingTable routingTable = clusterState.routingTable(); + String[] indices = routingTable.indicesRouting().keys().toArray(String.class); + for (String index : indices) { + List allShardsIndex = routingTable.allShards(index); + value.append(createJsonObject(AllMetrics.ShardStateDimension.INDEX_NAME.toString(), index)); + for (ShardRouting shard :allShardsIndex) { + String nodeName = StringUtils.EMPTY; + if (shard.assignedToNode()) { + nodeName = clusterState.nodes().get(shard.currentNodeId()).getName(); + } + if (shard.state() != ShardRoutingState.STARTED) { + inActiveShard = true; + value + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) + .append(new ShardStateMetrics( + shard.getId(), + shard.primary() ? SHARD_PRIMARY.toString() : SHARD_REPLICA.toString(), + nodeName, + shard.state().name()) + .serialize()); + + } + } + } + value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + if(inActiveShard) { + saveMetricValues(value.toString(), startTime); + } + PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat( + WriterMetrics.SHARD_STATE_COLLECTOR_EXECUTION_TIME, "", + System.currentTimeMillis() - mCurrT); + } catch (Exception ex) { + PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( + ExceptionsAndErrors.SHARD_STATE_COLLECTOR_ERROR, "", 1); + LOG.debug("Exception in Collecting Shard Metrics: {} for startTime {}", () -> ex.toString(), + () -> startTime); + } + } + + @SuppressWarnings("unchecked") + private String createJsonObject(String key, String value) { + JSONObject json = new JSONObject(); + json.put(key,value); + return json.toString(); + } + + @Override + public String getMetricsPath(long startTime, String... keysPath) { + if (keysPath.length != KEYS_PATH_LENGTH) { + throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH); + } + + return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sShardStatePath); + } + + public static class ShardStateMetrics extends MetricStatus { + + private final int shardId; + private final String shardType; + private final String nodeName; + private final String shardState; + + public ShardStateMetrics(int shardId, String shardType, String nodeName, String shardState) { + this.shardId = shardId; + this.shardType = shardType; + this.nodeName = nodeName; + this.shardState = shardState; + } + + @JsonProperty(AllMetrics.CommonDimension.Constants.SHARDID_VALUE) + public int getShardId() { + return shardId; + } + + @JsonProperty(AllMetrics.ShardStateDimension.Constants.SHARD_TYPE) + public String getShardType() { + return shardType; + } + + @JsonProperty(AllMetrics.ShardStateDimension.Constants.NODE_NAME) + public String getNodeName() { + return nodeName; + } + + @JsonProperty(AllMetrics.ShardStateValue.Constants.SHARD_STATE) + public String getShardState() { + return shardState; + } + } + +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/PerformanceAnalyzerController.java index b752a96d..4478d439 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -5,8 +5,10 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; import java.util.Scanner; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -257,4 +259,13 @@ private void saveStateToConf(boolean featureEnabled, String fileName) { } }); } + + public boolean isCollectorEnabled(ConfigOverridesWrapper configOverridesWrapper, String collectorName) { + if(configOverridesWrapper == null) { + return false; + } + List enabledCollectorsList = configOverridesWrapper.getCurrentClusterConfigOverrides().getEnable() + .getCollectors(); + return enabledCollectorsList.contains(collectorName) ? true: false; + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java index 4c5b09e3..f59edbc7 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java @@ -17,6 +17,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CircuitBreakerCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterServiceEventMetrics; @@ -51,7 +52,7 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(NodeStatsFixedShardsMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(MasterServiceEventMetrics.class, new MetricsConfiguration.MetricConfig(1000, 0, 0)); MetricsConfiguration.CONFIG_MAP.put(MasterServiceMetrics.class, cdefault); - + MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, cdefault); } // These methods are utility functions for the Node Stat Metrics Collectors. These methods are used by both the all diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollectorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollectorTest.java new file mode 100644 index 00000000..9239d3d7 --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ShardStateCollectorTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ShardStateCollectorTest extends CustomMetricsLocationTestBase { + + @Test + public void testShardsStateMetrics() { + MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, MetricsConfiguration.cdefault); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + long startTimeInMills = 1153721339; + PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); + ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); + Mockito.when(controller.isCollectorEnabled(configOverrides, "ShardStateCollector")) + .thenReturn(true); + ShardStateCollector shardsStateCollector = new ShardStateCollector(controller, configOverrides); + shardsStateCollector.saveMetricValues("shard_state_metrics", startTimeInMills); + String fetchedValue = PerformanceAnalyzerMetrics.getMetric(PluginSettings.instance().getMetricsLocation() + + PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)+"/shard_state_metrics/"); + PerformanceAnalyzerMetrics.removeMetrics(PluginSettings.instance().getMetricsLocation() + + PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)); + assertEquals("shard_state_metrics", fetchedValue); + + try { + shardsStateCollector.saveMetricValues("shard_state_metrics", startTimeInMills, "123"); + assertTrue("Negative scenario test: Should have been a RuntimeException", true); + } catch (RuntimeException ex) { + //- expecting exception...1 values passed; 0 expected + } + } +} +