Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Publish fault detection metrics (#218)
Browse files Browse the repository at this point in the history
* Publish Fault Detection Metrics

* Publish Fault Detection Metrics2

* Publish Fault Detection Metrics

* Publish Fault Detection Metrics

* Publish Fault Detection Metrics

* Publish Fault Detection Metrics

* Publish Fault Detection Metrics

Co-authored-by: Arpita <[email protected]>
  • Loading branch information
amathur1893 and Arpita authored Oct 22, 2020
1 parent 3b4e1c3 commit 244401b
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
Expand All @@ -24,6 +25,8 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.http_action.config.PerformanceAnalyzerResourceProvider;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,6 +58,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -107,6 +111,9 @@
public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlugin, NetworkPlugin, SearchPlugin {
private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerPlugin.class);
public static final String PLUGIN_NAME = "opendistro_performance_analyzer";
private static final String ADD_FAULT_DETECTION_METHOD = "addFaultDetectionListener";
private static final String LISTENER_INJECTOR_CLASS_PATH =
"com.amazon.opendistro.elasticsearch.performanceanalyzer.listener.ListenerInjector";
public static final int QUEUE_PURGE_INTERVAL_MS = 1000;
private static SecurityManager sm = null;
private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler;
Expand All @@ -125,6 +132,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin implements ActionPlu
}
}



public static void invokePrivileged(Runnable runner) {
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
try {
Expand Down Expand Up @@ -200,6 +209,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new NetworkInterfaceCollector());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance());
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new FaultDetectionMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new ShardStateCollector(
performanceAnalyzerController,configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new MasterThrottlingMetricsCollector(
Expand Down Expand Up @@ -235,6 +246,22 @@ public void onIndexModule(IndexModule indexModule) {
indexModule.addSearchOperationListener(performanceanalyzerSearchListener);
}

//follower check, leader check
public void onDiscovery(Discovery discovery) {
try {
Class<?> listenerInjector = Class.forName(LISTENER_INJECTOR_CLASS_PATH);
Object listenerInjectorInstance = listenerInjector.getDeclaredConstructor().newInstance();
Method addListenerMethod = listenerInjectorInstance.getClass().getMethod(ADD_FAULT_DETECTION_METHOD,
Discovery.class);
addListenerMethod.invoke(listenerInjectorInstance, discovery);
} catch (InstantiationException | InvocationTargetException | NoSuchMethodException |
IllegalAccessException e) {
LOG.debug("Exception while calling addFaultDetectionListener in Discovery");
} catch (ClassNotFoundException e) {
LOG.debug("No Class for ListenerInjector detected");
}
}

//- shardbulk
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
Expand Down Expand Up @@ -301,4 +328,3 @@ public List<Setting<?>> getSettings() {
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.WriterMetrics;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.tools.StringUtils;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics.addMetricEntry;

public class FaultDetectionMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.
get(FaultDetectionMetricsCollector.class).samplingInterval;
private static final int KEYS_PATH_LENGTH = 3;
private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsCollector.class);
private static final String FAULT_DETECTION_HANDLER_NAME =
"com.amazon.opendistro.elasticsearch.performanceanalyzer.handler.ClusterFaultDetectionStatsHandler";
private static final String FAULT_DETECTION_HANDLER_METRIC_QUEUE = "metricQueue";
private final ConfigOverridesWrapper configOverridesWrapper;
private final PerformanceAnalyzerController controller;
private StringBuilder value;
private static final ObjectMapper mapper = new ObjectMapper();

public FaultDetectionMetricsCollector(PerformanceAnalyzerController controller,
ConfigOverridesWrapper configOverridesWrapper) {
super(SAMPLING_TIME_INTERVAL, "FaultDetectionMetricsCollector");
value = new StringBuilder();
this.configOverridesWrapper = configOverridesWrapper;
this.controller = controller;
}

@Override
@SuppressWarnings("unchecked")
void collectMetrics(long startTime) {
if(!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) {
return;
}
long mCurrT = System.currentTimeMillis();
Class<?> faultDetectionHandler = null;
try {
faultDetectionHandler = Class.forName(FAULT_DETECTION_HANDLER_NAME);
} catch (ClassNotFoundException e) {
LOG.debug("No Handler Detected for Fault Detection. Skipping FaultDetectionMetricsCollector");
return;
}
try {
BlockingQueue<String> metricQueue = (BlockingQueue<String>)
getFaultDetectionHandlerMetricsQueue(faultDetectionHandler).get(null);
List<String> metrics = new ArrayList<>();
metricQueue.drainTo(metrics);

List<ClusterFaultDetectionContext> faultDetectionContextsList = new ArrayList<>();
for(String metric : metrics) {
faultDetectionContextsList.add(mapper.readValue(metric, ClusterFaultDetectionContext.class));
}

for(ClusterFaultDetectionContext clusterFaultDetectionContext : faultDetectionContextsList) {
value.setLength(0);
value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric());
addMetricEntry(value, AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID
.toString(), clusterFaultDetectionContext.getSourceNodeId());
addMetricEntry(value, AllMetrics.FaultDetectionDimension.TARGET_NODE_ID
.toString(), clusterFaultDetectionContext.getTargetNodeId());

if(StringUtils.isEmpty(clusterFaultDetectionContext.getStartTime())) {
addMetricEntry(value, AllMetrics.CommonMetric.FINISH_TIME.toString(),
clusterFaultDetectionContext.getFinishTime());
addMetricEntry(value, PerformanceAnalyzerMetrics.FAULT,
clusterFaultDetectionContext.getFault());
saveMetricValues(value.toString(), startTime, clusterFaultDetectionContext.getType(),
clusterFaultDetectionContext.getRequestId(), PerformanceAnalyzerMetrics.FINISH_FILE_NAME);
} else {
addMetricEntry(value, AllMetrics.CommonMetric.START_TIME.toString(),
clusterFaultDetectionContext.getStartTime());
saveMetricValues(value.toString(), startTime, clusterFaultDetectionContext.getType(),
clusterFaultDetectionContext.getRequestId(), PerformanceAnalyzerMetrics.START_FILE_NAME);
}
}
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.FAULT_DETECTION_COLLECTOR_EXECUTION_TIME, "",
System.currentTimeMillis() - mCurrT);
} catch (Exception ex) {
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.FAULT_DETECTION_COLLECTOR_ERROR, "",
System.currentTimeMillis() - mCurrT);
LOG.debug("Exception in Collecting FaultDetection Metrics: {} for startTime {}",
() -> ex.toString(), () -> startTime);
}
}

Field getFaultDetectionHandlerMetricsQueue(Class<?> faultDetectionHandler) throws Exception {
Field metricsQueue = faultDetectionHandler.getDeclaredField(FAULT_DETECTION_HANDLER_METRIC_QUEUE);
metricsQueue.setAccessible(true);
return metricsQueue;
}

/** Sample Event
* ^fault_detection/follower_check/7627/finish
* current_time:1601486201861
* SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a
* TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a
* FinishTime:1566413987986
* fault:0$
*
* @param startTime time at which collector is called
* @param keysPath List of string that would make up the metrics path
* @return metric path
*/
@Override
public String getMetricsPath(long startTime, String... keysPath) {
if (keysPath.length != KEYS_PATH_LENGTH) {
throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH);
}

return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sFaultDetection,
keysPath[0], keysPath[1], keysPath[2]);
}

public static class ClusterFaultDetectionContext {
String type;
String sourceNodeId;
String targetNodeId;
String requestId;
String fault;
String startTime;
String finishTime;

public String getType() {
return this.type;
}

public String getSourceNodeId() {
return this.sourceNodeId;
}

public String getTargetNodeId() {
return this.targetNodeId;
}

public String getFault() {
return this.fault;
}

public String getStartTime() {
return this.startTime;
}

public String getFinishTime() {
return this.finishTime;
}

public String getRequestId() {
return this.requestId;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.CacheConfigMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.FaultDetectionMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.MasterThrottlingMetricsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ShardStateCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
Expand Down Expand Up @@ -53,6 +54,7 @@ public static void configureMetrics() {
MetricsConfiguration.CONFIG_MAP.put(NodeStatsFixedShardsMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(MasterServiceEventMetrics.class, new MetricsConfiguration.MetricConfig(1000, 0, 0));
MetricsConfiguration.CONFIG_MAP.put(MasterServiceMetrics.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(ShardStateCollector.class, cdefault);
MetricsConfiguration.CONFIG_MAP.put(MasterThrottlingMetricsCollector.class, cdefault);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PerformanceAnalyzerController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.overrides.ConfigOverridesWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import com.fasterxml.jackson.databind.cfg.ConfigOverrides;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class FaultDetectionMetricsCollectorTest extends CustomMetricsLocationTestBase {
@Test
public void testShardsStateMetrics() {
MetricsConfiguration.CONFIG_MAP.put(FaultDetectionMetricsCollector.class, MetricsConfiguration.cdefault);
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
long startTimeInMills = 1153721339;
PerformanceAnalyzerController controller = Mockito.mock(PerformanceAnalyzerController.class);
ConfigOverridesWrapper configOverrides = Mockito.mock(ConfigOverridesWrapper.class);
Mockito.when(controller.isCollectorEnabled(configOverrides, "FaultDetectionMetricsCollector"))
.thenReturn(true);
FaultDetectionMetricsCollector faultDetectionMetricsCollector = new FaultDetectionMetricsCollector(
controller, configOverrides);
faultDetectionMetricsCollector.saveMetricValues("fault_detection", startTimeInMills,
"follower_check", "65432", "start");
String fetchedValue = PerformanceAnalyzerMetrics.getMetric(PluginSettings.instance().getMetricsLocation()
+ PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)+"/fault_detection/");
PerformanceAnalyzerMetrics.removeMetrics(PluginSettings.instance().getMetricsLocation()
+ PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills));
assertEquals("fault_detection", fetchedValue);

try {
faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills);
assertTrue("Negative scenario test: Should have been a RuntimeException", true);
} catch (RuntimeException ex) {
//- expecting exception...0 values passed; 3 expected
}

try {
faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills,
"leader_check");
assertTrue("Negative scenario test: Should have been a RuntimeException", true);
} catch (RuntimeException ex) {
//- expecting exception...1 values passed; 3 expected
}

try {
faultDetectionMetricsCollector.saveMetricValues("fault_detection_metrics", startTimeInMills,
"leader_check", "823765423");
assertTrue("Negative scenario test: Should have been a RuntimeException", true);
} catch (RuntimeException ex) {
//- expecting exception...2 values passed; 0 expected
}
}
}

0 comments on commit 244401b

Please sign in to comment.