diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index f859b500..123391a4 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -15,6 +15,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; @@ -24,6 +25,8 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction; import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerResourceProvider; import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -55,6 +58,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexModule; @@ -107,6 +111,9 @@ public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlugin, NetworkPlugin, SearchPlugin { private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerPlugin.class); public static final String PLUGIN_NAME = "opendistro_performance_analyzer"; + private static final String ADD_FAULT_DETECTION_METHOD = "addFaultDetectionListener"; + private static final String LISTENER_INJECTOR_CLASS_PATH = + "com.amazon.opendistro.elasticsearch.performanceanalyzer.listener.ListenerInjector"; public static final int QUEUE_PURGE_INTERVAL_MS = 1000; private static SecurityManager sm = null; private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler; @@ -125,6 +132,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlu } } + + public static void invokePrivileged(Runnable runner) { AccessController.doPrivileged((PrivilegedAction) () -> { try { @@ -200,6 +209,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NetworkInterfaceCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance()); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new FaultDetectionMetricsCollector( + performanceAnalyzerController, configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ShardStateCollector( performanceAnalyzerController,configOverridesWrapper)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterThrottlingMetricsCollector( @@ -235,6 +246,22 @@ public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(performanceanalyzerSearchListener); } + //follower check, leader check + public void onDiscovery(Discovery discovery) { + try { + Class listenerInjector = Class.forName(LISTENER_INJECTOR_CLASS_PATH); + Object listenerInjectorInstance = listenerInjector.getDeclaredConstructor().newInstance(); + Method addListenerMethod = listenerInjectorInstance.getClass().getMethod(ADD_FAULT_DETECTION_METHOD, + Discovery.class); + addListenerMethod.invoke(listenerInjectorInstance, discovery); + } catch (InstantiationException | InvocationTargetException | NoSuchMethodException | + IllegalAccessException e) { + LOG.debug("Exception while calling addFaultDetectionListener in Discovery"); + } catch (ClassNotFoundException e) { + LOG.debug("No Class for ListenerInjector detected"); + } + } + //- shardbulk @Override public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { @@ -301,4 +328,3 @@ public List> getSettings() { } } - diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java new file mode 100644 index 00000000..e98b831d --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java @@ -0,0 +1,185 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.WriterMetrics; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.tools.StringUtils; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics.addMetricEntry; + +public class FaultDetectionMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor { + public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP. + get(FaultDetectionMetricsCollector.class).samplingInterval; + private static final int KEYS_PATH_LENGTH = 3; + private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsCollector.class); + private static final String FAULT_DETECTION_HANDLER_NAME = + "com.amazon.opendistro.elasticsearch.performanceanalyzer.handler.ClusterFaultDetectionStatsHandler"; + private static final String FAULT_DETECTION_HANDLER_METRIC_QUEUE = "metricQueue"; + private final ConfigOverridesWrapper configOverridesWrapper; + private final PerformanceAnalyzerController controller; + private StringBuilder value; + private static final ObjectMapper mapper = new ObjectMapper(); + + public FaultDetectionMetricsCollector(PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super(SAMPLING_TIME_INTERVAL, "FaultDetectionMetricsCollector"); + value = new StringBuilder(); + this.configOverridesWrapper = configOverridesWrapper; + this.controller = controller; + } + + @Override + @SuppressWarnings("unchecked") + void collectMetrics(long startTime) { + if(!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) { + return; + } + long mCurrT = System.currentTimeMillis(); + Class faultDetectionHandler = null; + try { + faultDetectionHandler = Class.forName(FAULT_DETECTION_HANDLER_NAME); + } catch (ClassNotFoundException e) { + LOG.debug("No Handler Detected for Fault Detection. Skipping FaultDetectionMetricsCollector"); + return; + } + try { + BlockingQueue metricQueue = (BlockingQueue) + getFaultDetectionHandlerMetricsQueue(faultDetectionHandler).get(null); + List metrics = new ArrayList<>(); + metricQueue.drainTo(metrics); + + List faultDetectionContextsList = new ArrayList<>(); + for(String metric : metrics) { + faultDetectionContextsList.add(mapper.readValue(metric, ClusterFaultDetectionContext.class)); + } + + for(ClusterFaultDetectionContext clusterFaultDetectionContext : faultDetectionContextsList) { + value.setLength(0); + value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric()); + addMetricEntry(value, AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID + .toString(), clusterFaultDetectionContext.getSourceNodeId()); + addMetricEntry(value, AllMetrics.FaultDetectionDimension.TARGET_NODE_ID + .toString(), clusterFaultDetectionContext.getTargetNodeId()); + + if(StringUtils.isEmpty(clusterFaultDetectionContext.getStartTime())) { + addMetricEntry(value, AllMetrics.CommonMetric.FINISH_TIME.toString(), + clusterFaultDetectionContext.getFinishTime()); + addMetricEntry(value, PerformanceAnalyzerMetrics.FAULT, + clusterFaultDetectionContext.getFault()); + saveMetricValues(value.toString(), startTime, clusterFaultDetectionContext.getType(), + clusterFaultDetectionContext.getRequestId(), PerformanceAnalyzerMetrics.FINISH_FILE_NAME); + } else { + addMetricEntry(value, AllMetrics.CommonMetric.START_TIME.toString(), + clusterFaultDetectionContext.getStartTime()); + saveMetricValues(value.toString(), startTime, clusterFaultDetectionContext.getType(), + clusterFaultDetectionContext.getRequestId(), PerformanceAnalyzerMetrics.START_FILE_NAME); + } + } + PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat( + WriterMetrics.FAULT_DETECTION_COLLECTOR_EXECUTION_TIME, "", + System.currentTimeMillis() - mCurrT); + } catch (Exception ex) { + PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat( + ExceptionsAndErrors.FAULT_DETECTION_COLLECTOR_ERROR, "", + System.currentTimeMillis() - mCurrT); + LOG.debug("Exception in Collecting FaultDetection Metrics: {} for startTime {}", + () -> ex.toString(), () -> startTime); + } + } + + Field getFaultDetectionHandlerMetricsQueue(Class faultDetectionHandler) throws Exception { + Field metricsQueue = faultDetectionHandler.getDeclaredField(FAULT_DETECTION_HANDLER_METRIC_QUEUE); + metricsQueue.setAccessible(true); + return metricsQueue; + } + + /** Sample Event + * ^fault_detection/follower_check/7627/finish + * current_time:1601486201861 + * SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a + * TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a + * FinishTime:1566413987986 + * fault:0$ + * + * @param startTime time at which collector is called + * @param keysPath List of string that would make up the metrics path + * @return metric path + */ + @Override + public String getMetricsPath(long startTime, String... keysPath) { + if (keysPath.length != KEYS_PATH_LENGTH) { + throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH); + } + + return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sFaultDetection, + keysPath[0], keysPath[1], keysPath[2]); + } + + public static class ClusterFaultDetectionContext { + String type; + String sourceNodeId; + String targetNodeId; + String requestId; + String fault; + String startTime; + String finishTime; + + public String getType() { + return this.type; + } + + public String getSourceNodeId() { + return this.sourceNodeId; + } + + public String getTargetNodeId() { + return this.targetNodeId; + } + + public String getFault() { + return this.fault; + } + + public String getStartTime() { + return this.startTime; + } + + public String getFinishTime() { + return this.finishTime; + } + + public String getRequestId() { + return this.requestId; + } + + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java index a265b8e2..e4d4cbbb 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/util/Utils.java @@ -17,6 +17,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; @@ -53,6 +54,7 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(NodeStatsFixedShardsMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(MasterServiceEventMetrics.class, new MetricsConfiguration.MetricConfig(1000, 0, 0)); MetricsConfiguration.CONFIG_MAP.put(MasterServiceMetrics.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(MasterThrottlingMetricsCollector.class, cdefault); } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollectorTest.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollectorTest.java new file mode 100644 index 00000000..72ae2dde --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/FaultDetectionMetricsCollectorTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import com.fasterxml.jackson.databind.cfg.ConfigOverrides; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class FaultDetectionMetricsCollectorTest extends CustomMetricsLocationTestBase { + @Test + public void testShardsStateMetrics() { + MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, MetricsConfiguration.cdefault); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + long startTimeInMills = 1153721339; + PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class); + ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class); + Mockito.when(controller.isCollectorEnabled(configOverrides, "FaultDetectionMetricsCollector")) + .thenReturn(true); + FaultDetectionMetricsCollector faultDetectionMetricsCollector = new FaultDetectionMetricsCollector( + controller, configOverrides); + faultDetectionMetricsCollector.saveMetricValues("fault_detection", startTimeInMills, + "follower_check", "65432", "start"); + String fetchedValue = PerformanceAnalyzerMetrics.getMetric(PluginSettings.instance().getMetricsLocation() + + PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)+"/fault_detection/"); + PerformanceAnalyzerMetrics.removeMetrics(PluginSettings.instance().getMetricsLocation() + + PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)); + assertEquals("fault_detection", fetchedValue); + + try { + faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills); + assertTrue("Negative scenario test: Should have been a RuntimeException", true); + } catch (RuntimeException ex) { + //- expecting exception...0 values passed; 3 expected + } + + try { + faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills, + "leader_check"); + assertTrue("Negative scenario test: Should have been a RuntimeException", true); + } catch (RuntimeException ex) { + //- expecting exception...1 values passed; 3 expected + } + + try { + faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills, + "leader_check", "823765423"); + assertTrue("Negative scenario test: Should have been a RuntimeException", true); + } catch (RuntimeException ex) { + //- expecting exception...2 values passed; 0 expected + } + } +} +