diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java
index 6e479f4e9..aad4d465d 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java
@@ -1056,6 +1056,68 @@ public static class Constants {
public static final String SHARD_OP_COUNT_VALUE = "ShardEvents";
}
}
+ /*
+ * column names of FollowerCheck_Latency table
+ * SourceNodeId | TargetNodeID | sum | avg | min |max
+ *
+ * column names of LeaderCheck_Latency table
+ * SourceNodeId | TargetNodeID | sum | avg | min |max
+ *
+ * column names of FollowerCheck_Failure table
+ * SourceNodeId | TargetNodeID | sum | avg | min |max
+ *
+ * column names of LeaderCheck_Failure table
+ * SourceNodeId | TargetNodeID | sum | avg | min |max
+ *
+ *
Example:
+ * chMe07whRwGrOAqyLTP9vw|hgi7an4RwGrOAqyLTP9vw|1.0|0.2|0.0|1.0
+ */
+
+ public enum FaultDetectionMetric implements MetricValue {
+ FOLLOWER_CHECK_LATENCY(Constants.FOLLOWER_CHECK_LATENCY),
+ LEADER_CHECK_LATENCY(Constants.LEADER_CHECK_LATENCY),
+ FOLLOWER_CHECK_FAILURE(Constants.FOLLOWER_CHECK_FAILURE),
+ LEADER_CHECK_FAILURE(Constants.LEADER_CHECK_FAILURE);
+
+ private final String value;
+
+ FaultDetectionMetric(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public static class Constants {
+ public static final String FOLLOWER_CHECK_LATENCY = "FollowerCheck_Latency";
+ public static final String LEADER_CHECK_LATENCY = "LeaderCheck_Latency";
+ public static final String FOLLOWER_CHECK_FAILURE = "FollowerCheck_Failure";
+ public static final String LEADER_CHECK_FAILURE = "LeaderCheck_Failure";
+ }
+ }
+
+ public enum FaultDetectionDimension implements MetricDimension {
+ SOURCE_NODE_ID(Constants.SOURCE_NODE_ID),
+ TARGET_NODE_ID(Constants.TARGET_NODE_ID);
+
+ private final String value;
+
+ FaultDetectionDimension(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public static class Constants {
+ public static final String SOURCE_NODE_ID = "SourceNodeID";
+ public static final String TARGET_NODE_ID = "TargetNodeID";
+ }
+ }
public enum CommonDimension implements MetricDimension {
INDEX_NAME(Constants.INDEX_NAME_VALUE),
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java
index 37f20f17c..3feb2cd2d 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java
@@ -43,6 +43,7 @@ public class PerformanceAnalyzerMetrics {
public static final String sShardFetchPath = "shardfetch";
public static final String sShardQueryPath = "shardquery";
public static final String sMasterTaskPath = "master_task";
+ public static final String sFaultDetection = "fault_detection";
public static final String sHttpPath = "http";
public static final String sOSPath = "os_metrics";
public static final String sHeapPath = "heap_metrics";
@@ -62,6 +63,9 @@ public class PerformanceAnalyzerMetrics {
public static final String MASTER_CURRENT = "current";
public static final String MASTER_META_DATA = "metadata";
public static final String METRIC_CURRENT_TIME = "current_time";
+ public static final String FAULT_DETECTION_FOLLOWER_CHECK = "follower_check";
+ public static final String FAULT_DETECTION_LEADER_CHECK = "leader_check";
+ public static final String FAULT = "fault";
public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize();
// TODO: Comeup with a more sensible number.
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java
index ea86ef15e..f899dd81d 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java
@@ -340,7 +340,26 @@ public class MetricsModel {
new MetricAttributes(
MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values()));
- // Master Throttling Metrics
+ allMetricsInitializer.put(
+ AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(),
+ new MetricAttributes(
+ MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values()));
+
+ allMetricsInitializer.put(
+ AllMetrics.FaultDetectionMetric.LEADER_CHECK_LATENCY.toString(),
+ new MetricAttributes(
+ MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values()));
+
+ allMetricsInitializer.put(
+ AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString(),
+ new MetricAttributes(
+ MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values()));
+
+ allMetricsInitializer.put(
+ AllMetrics.FaultDetectionMetric.LEADER_CHECK_FAILURE.toString(),
+ new MetricAttributes(
+ MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values()));
+
allMetricsInitializer.put(
AllMetrics.MasterThrottlingValue.MASTER_THROTTLED_PENDING_TASK_COUNT.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values()));
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java
index aa27bf81b..55100a0ae 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java
@@ -51,7 +51,9 @@ public enum ExceptionsAndErrors implements MeasurementSet {
SHARD_STATE_COLLECTOR_ERROR("ShardStateCollectorError"),
- MASTER_THROTTLING_COLLECTOR_ERROR("MasterThrottlingMetricsCollector");
+ MASTER_THROTTLING_COLLECTOR_ERROR("MasterThrottlingMetricsCollector"),
+
+ FAULT_DETECTION_COLLECTOR_ERROR("FaultDetectionMetricsCollector");
/** What we want to appear as the metric name. */
private String name;
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java
index 0cff09434..056bd3a61 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java
@@ -67,8 +67,10 @@ public enum ReaderMetrics implements MeasurementSet {
* Amount of time taken to emit Master throttling metrics.
*/
MASTER_THROTTLING_EMITTER_EXECUTION_TIME("MasterThrottlingEmitterExecutionTime", "millis",
- Arrays.asList(Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM));
+ Arrays.asList(Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)),
+ FAULT_DETECTION_METRICS_EMITTER_EXECUTION_TIME("FaultDetectionMetricsEmitterExecutionTime", "millis",
+ Arrays.asList(Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM));
/** What we want to appear as the metric name. */
private String name;
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java
index 11c542bf9..6277c3861 100644
--- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java
@@ -30,7 +30,10 @@ public enum WriterMetrics implements MeasurementSet {
Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)),
MASTER_THROTTLING_COLLECTOR_NOT_AVAILABLE("MasterThrottlingCollectorNotAvailable", "count", Arrays.asList(
- Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM));
+ Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)),
+
+ FAULT_DETECTION_COLLECTOR_EXECUTION_TIME("FaultDetectionCollectorExecutionTime", "millis", Arrays.asList(
+ Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM));
/** What we want to appear as the metric name. */
private String name;
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java
new file mode 100644
index 000000000..8e1351909
--- /dev/null
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.FaultDetectionDimension;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event;
+import java.io.File;
+import java.sql.Connection;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jooq.BatchBindStep;
+
+public class FaultDetectionMetricsProcessor implements EventProcessor {
+ private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsProcessor.class);
+ private FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot;
+ private long startTime;
+ private long endTime;
+ private BatchBindStep handle;
+
+ public FaultDetectionMetricsProcessor(FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot) {
+ this.faultDetectionMetricsSnapshot = faultDetectionMetricsSnapshot;
+ }
+
+ static FaultDetectionMetricsProcessor buildFaultDetectionMetricsProcessor(
+ long currWindowStartTime,
+ Connection conn,
+ NavigableMap
+ faultDetectionMetricsMap) {
+
+ if (faultDetectionMetricsMap.get(currWindowStartTime) == null) {
+ FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot =
+ new FaultDetectionMetricsSnapshot(conn, currWindowStartTime);
+ Map.Entry entry = faultDetectionMetricsMap.lastEntry();
+ if (entry != null) {
+ faultDetectionMetricsSnapshot.rolloverInFlightRequests(entry.getValue());
+ }
+ faultDetectionMetricsMap.put(currWindowStartTime, faultDetectionMetricsSnapshot);
+ return new FaultDetectionMetricsProcessor(faultDetectionMetricsSnapshot);
+ } else {
+ return new FaultDetectionMetricsProcessor(faultDetectionMetricsMap.get(currWindowStartTime));
+ }
+ }
+
+ @Override
+ public void initializeProcessing(long startTime, long endTime) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.handle = faultDetectionMetricsSnapshot.startBatchPut();
+ }
+
+ @Override
+ public void finalizeProcessing() {
+ if (handle.size() > 0) {
+ handle.execute();
+ }
+ LOG.debug("Final Fault Detection request metrics {}", faultDetectionMetricsSnapshot.fetchAll());
+ }
+
+ @Override
+ public void processEvent(Event event) {
+ String[] keyItems = event.key.split(File.separatorChar == '\\' ? "\\\\" : File.separator);
+ assert keyItems.length == 4;
+ if (keyItems[0].equals(PerformanceAnalyzerMetrics.sFaultDetection)) {
+ if (keyItems[3].equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) {
+ emitStartMetric(event, keyItems);
+ } else if (keyItems[3].equals(PerformanceAnalyzerMetrics.FINISH_FILE_NAME)) {
+ emitFinishMetric(event, keyItems);
+ }
+ }
+ }
+
+ @Override
+ public boolean shouldProcessEvent(Event event) {
+ return event.key.contains(PerformanceAnalyzerMetrics.sFaultDetection);
+ }
+
+ @Override
+ public void commitBatchIfRequired() {
+ if (handle.size() > BATCH_LIMIT) {
+ handle.execute();
+ handle = faultDetectionMetricsSnapshot.startBatchPut();
+ }
+ }
+
+ /**
+ * A keyItem is of the form : [fault_detection, follower_check, 76532, start]
+ * Example value part of the entry is:
+ * current_time:1566413979979
+ * StartTime:1566413987986
+ * SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a
+ * TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a
+ * $
+ * @param entry fault detection event.
+ * @param keyItems keys extracted from metrics path
+ */
+ private void emitStartMetric(Event entry, String[] keyItems) {
+ Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value);
+
+ String sourceNodeId = keyValueMap.get(FaultDetectionDimension.SOURCE_NODE_ID.toString());
+ String targetNodeId = keyValueMap.get(FaultDetectionDimension.TARGET_NODE_ID.toString());
+ String startTimeVal = keyValueMap.get(CommonMetric.START_TIME.toString());
+
+ try {
+ long st = Long.parseLong(startTimeVal);
+
+ String fault_detection_type = keyItems[1];
+ String rid = keyItems[2];
+ // A keyItem is of the form : [fault_detection, follower_check, 76543, start]
+ handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, st, null, 0);
+ } catch (NumberFormatException e) {
+ LOG.error("Unable to parse string. StartTime:{}", startTimeVal);
+ StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR);
+ throw e;
+ }
+ }
+
+ /**
+ * A keyItem is of the form : [fault_detection, follower_check, 76532, start]
+ * Example value part of the entry is:
+ * current_time:1566413979979
+ * FinishTime:1566413987986
+ * SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a
+ * TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a
+ * fault:0
+ * $
+ * @param entry fault detection event.
+ * @param keyItems keys extracted from metrics path
+ */
+ private void emitFinishMetric(Event entry, String[] keyItems) {
+ Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value);
+
+ String sourceNodeId = keyValueMap.get(FaultDetectionDimension.SOURCE_NODE_ID.toString());
+ String targetNodeId = keyValueMap.get(FaultDetectionDimension.TARGET_NODE_ID.toString());
+ String finishTimeVal = keyValueMap.get(CommonMetric.FINISH_TIME.toString());
+ String faultString = keyValueMap.get(PerformanceAnalyzerMetrics.FAULT);
+
+ try {
+ long et = Long.parseLong(finishTimeVal);
+ int fault = Integer.parseInt(faultString);
+
+ String fault_detection_type = keyItems[1];
+ String rid = keyItems[2];
+ // A keyItem is of the form : [fault_detection, follower_check, 76543, finish]
+ handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, null, et, fault);
+ } catch (NumberFormatException e) {
+ LOG.error("Unable to parse string. StartTime:{}, Error:{}", finishTimeVal, faultString);
+ StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR);
+ throw e;
+ }
+ }
+}
diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java
new file mode 100644
index 000000000..bc5015a81
--- /dev/null
+++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
+
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.FaultDetectionDimension;
+import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
+import com.google.common.annotations.VisibleForTesting;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jooq.BatchBindStep;
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Result;
+import org.jooq.SQLDialect;
+import org.jooq.SelectField;
+import org.jooq.SelectHavingStep;
+import org.jooq.impl.DSL;
+
+public class FaultDetectionMetricsSnapshot implements Removable {
+ private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsSnapshot.class);
+ private final DSLContext create;
+ private final Long windowStartTime;
+ private final String tableName;
+ private ArrayList> columns;
+ private static final Long EXPIRE_AFTER = 600000L;
+
+ public enum Fields {
+ RID("rid"),
+ FAULT_DETECTION_TYPE("type"),
+ ST("st"),
+ ET("et"),
+ LAT("lat"),
+ FAULT("fault");
+
+ private final String fieldValue;
+
+ Fields(String fieldValue) {
+ this.fieldValue = fieldValue;
+ }
+
+ @Override
+ public String toString() {
+ return fieldValue;
+ }
+
+ }
+
+ public FaultDetectionMetricsSnapshot(Connection conn, Long windowStartTime) {
+ this.create = DSL.using(conn, SQLDialect.SQLITE);
+ this.windowStartTime = windowStartTime;
+ this.tableName = "fault_detection_" + windowStartTime;
+
+ this.columns =
+ new ArrayList>() {
+ {
+ this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class));
+ this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class));
+ this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class));
+ this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class));
+ this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class));
+ this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class));
+ this.add(DSL.field(DSL.name(Fields.FAULT.toString()), Integer.class));
+ }
+ };
+ create.createTable(this.tableName).columns(columns).execute();
+ }
+
+ public BatchBindStep startBatchPut() {
+
+ List