From 8fdbea096d2a44e284db261224bdba21271a70a5 Mon Sep 17 00:00:00 2001 From: Arpita Date: Wed, 14 Oct 2020 16:27:27 +0530 Subject: [PATCH 1/4] Publish Fault Detection Metrics --- .../metrics/AllMetrics.java | 46 ++++ .../metrics/PerformanceAnalyzerMetrics.java | 4 + .../model/MetricsModel.java | 20 ++ .../FaultDetectionMetricsProcessor.java | 153 +++++++++++ .../reader/FaultDetectionMetricsSnapshot.java | 257 ++++++++++++++++++ .../reader/MetricsEmitter.java | 138 ++++++++++ .../reader/ReaderMetricsProcessor.java | 20 ++ .../FaultDetectionMetricsSnapshotTests.java | 52 ++++ .../reader/MetricsEmitterTests.java | 38 +++ src/test/resources/reader/1566413960000 | 26 ++ src/test/resources/reader/1566413965000 | 26 ++ src/test/resources/reader/1566413970000 | 26 ++ 12 files changed, 806 insertions(+) create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java create mode 100644 src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java create mode 100644 src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java index 8dd261684..0e445fe46 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java @@ -1030,6 +1030,52 @@ public static class Constants { } } + public enum FaultDetectionMetric implements MetricValue { + LATENCY_FOLLOWER_CHECK(Constants.LATENCY_FOLLOWER_CHECK), + LATENCY_LEADER_CHECK(Constants.LATENCY_LEADER_CHECK), + FAILURE_FOLLOWER_CHECK(Constants.FAILURE_FOLLOWER_CHECK), + FAILURE_LEADER_CHECK(Constants.FAILURE_LEADER_CHECK); + + private final String value; + + FaultDetectionMetric(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String LATENCY_FOLLOWER_CHECK = "Latency_FollowerCheck"; + public static final String LATENCY_LEADER_CHECK = "Latency_LeaderCheck"; + public static final String FAILURE_FOLLOWER_CHECK = "Failure_FollowerCheck"; + public static final String FAILURE_LEADER_CHECK = "Failure_LeaderCheck"; + } + } + + public enum FaultDetectionDimension implements MetricDimension { + SOURCE_NODE_ID(Constants.SOURCE_NODE_ID), + TARGET_NODE_ID(Constants.TARGET_NODE_ID); + + private final String value; + + FaultDetectionDimension(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + + public static class Constants { + public static final String SOURCE_NODE_ID = "SourceNodeID"; + public static final String TARGET_NODE_ID = "TargetNodeID"; + } + } + public enum CommonDimension implements MetricDimension { INDEX_NAME(Constants.INDEX_NAME_VALUE), OPERATION(Constants.OPERATION_VALUE), diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java index 527de19c5..91a4b22c6 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java @@ -43,6 +43,7 @@ public class PerformanceAnalyzerMetrics { public static final String sShardFetchPath = "shardfetch"; public static final String sShardQueryPath = "shardquery"; public static final String sMasterTaskPath = "master_task"; + public static final String sFaultDetection = "fault_detection"; public static final String sHttpPath = "http"; public static final String sOSPath = "os_metrics"; public static final String sHeapPath = "heap_metrics"; @@ -60,6 +61,9 @@ public class PerformanceAnalyzerMetrics { public static final String MASTER_CURRENT = "current"; public static final String MASTER_META_DATA = "metadata"; public static final String METRIC_CURRENT_TIME = "current_time"; + public static final String FAULT_DETECTION_FOLLOWER_CHECK = "follower_check"; + public static final String FAULT_DETECTION_LEADER_CHECK = "leader_check"; + public static final String ERROR = "error"; public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize(); // TODO: Comeup with a more sensible number. diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java index 152e9524e..ddef6036a 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java @@ -340,6 +340,26 @@ public class MetricsModel { new MetricAttributes( MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values())); + allMetricsInitializer.put( + AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), + new MetricAttributes( + MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values())); + + allMetricsInitializer.put( + AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(), + new MetricAttributes( + MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values())); + + allMetricsInitializer.put( + AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(), + new MetricAttributes( + MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values())); + + allMetricsInitializer.put( + AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(), + new MetricAttributes( + MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values())); + ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer); } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java new file mode 100644 index 000000000..015ad62e3 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java @@ -0,0 +1,153 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; +import java.io.File; +import java.sql.Connection; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; + +public class FaultDetectionMetricsProcessor implements EventProcessor { + private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsProcessor.class); + private FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot; + private long startTime; + private long endTime; + private BatchBindStep handle; + + public FaultDetectionMetricsProcessor(FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot) { + this.faultDetectionMetricsSnapshot = faultDetectionMetricsSnapshot; + } + + static FaultDetectionMetricsProcessor buildFaultDetectionMetricsProcessor( + long currWindowStartTime, + Connection conn, + NavigableMap faultDetectionMetricsMap) { + if (faultDetectionMetricsMap.get(currWindowStartTime) == null) { + FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot = + new FaultDetectionMetricsSnapshot(conn, currWindowStartTime); + Map.Entry entry = faultDetectionMetricsMap.lastEntry(); + if (entry != null) { + faultDetectionMetricsSnapshot.rolloverInFlightRequests(entry.getValue()); + } + faultDetectionMetricsMap.put(currWindowStartTime, faultDetectionMetricsSnapshot); + return new FaultDetectionMetricsProcessor(faultDetectionMetricsSnapshot); + } else { + return new FaultDetectionMetricsProcessor(faultDetectionMetricsMap.get(currWindowStartTime)); + } + } + + @Override + public void initializeProcessing(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + this.handle = faultDetectionMetricsSnapshot.startBatchPut(); + } + + @Override + public void finalizeProcessing() { + if (handle.size() > 0) { + handle.execute(); + } + LOG.debug("Final Fault Detection request metrics {}", faultDetectionMetricsSnapshot.fetchAll()); + } + + @Override + public void processEvent(Event event) { + String[] keyItems = event.key.split(File.separatorChar == '\\' ? "\\\\" : File.separator); + + for (String key:keyItems) { + LOG.error("Hello Key - " + key); + } + if (keyItems[0].equals(PerformanceAnalyzerMetrics.sFaultDetection)) { + LOG.error("It is starting"); + if (keyItems[3].equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) { + emitStartMetric(event, keyItems); + } else if (keyItems[3].equals(PerformanceAnalyzerMetrics.FINISH_FILE_NAME)) { + LOG.error("It is finishing"); + emitFinishMetric(event, keyItems); + } + } + } + + @Override + public boolean shouldProcessEvent(Event event) { + return event.key.contains(PerformanceAnalyzerMetrics.sFaultDetection); + } + + @Override + public void commitBatchIfRequired() { + if (handle.size() > BATCH_LIMIT) { + handle.execute(); + handle = faultDetectionMetricsSnapshot.startBatchPut(); + } + } + + // A keyItem is of the form : [fault_detection, follower_check, 76532, start] + // + // Example value part of the entry is: + // current_time:1566413979979 + // StartTime:1566413987986 + // SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a + // TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a + // $ + private void emitStartMetric(Event entry, String[] keyItems) { + Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value); + + String sourceNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()); + String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); + String startTimeVal = keyValueMap.get(AllMetrics.CommonMetric.START_TIME.toString()); + String fault_detection_type = keyItems[1]; + + LOG.error("You know what the values are - " + sourceNodeId + " " + targetNodeId + " " + startTimeVal + fault_detection_type); + + try { + long st = Long.parseLong(startTimeVal); + // A keyItem is of the form : [fault_detection, follower_check, 76543, start] + String rid = keyItems[2]; + handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, st, null, 0); + } catch (NumberFormatException e) { + LOG.error("Unable to parse string. StartTime:{}", startTimeVal); + StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR); + throw e; + } + } + + // A keyItem is of the form : [threads, http, bulk, 43369, start] + // + // Example value part of the entry is: + // current_time:1566413979979 + // StartTime:1566413987986 + // SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a + // TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a + // $ + private void emitFinishMetric(Event entry, String[] keyItems) { + Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value); + + String sourceNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()); + String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); + String finishTimeVal = keyValueMap.get(AllMetrics.CommonMetric.FINISH_TIME.toString()); + String errorString = keyValueMap.get(PerformanceAnalyzerMetrics.ERROR); + String fault_detection_type = keyItems[1]; + + LOG.error("Lets see finish values - " + sourceNodeId + " " + targetNodeId + " " + finishTimeVal + " " + errorString); + + try { + long et = Long.parseLong(finishTimeVal); + int error = Integer.parseInt(errorString); + // A keyItem is of the form : [fault_detection, follower_check, 76543, finish] + String rid = keyItems[2]; + handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, null, et, error); + } catch (NumberFormatException e) { + LOG.error("Unable to parse string. StartTime:{}, Error:{}", finishTimeVal, errorString); + StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR); + throw e; + } + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java new file mode 100644 index 000000000..db5c41065 --- /dev/null +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java @@ -0,0 +1,257 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import java.sql.Connection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.SQLDialect; +import org.jooq.SelectField; +import org.jooq.SelectHavingStep; +import org.jooq.impl.DSL; + +public class FaultDetectionMetricsSnapshot implements Removable { + private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsSnapshot.class); + private final DSLContext create; + private final Long windowStartTime; + private final String tableName; + private ArrayList> columns; + private static final Long EXPIRE_AFTER = 600000L; + + public enum Fields { + RID("rid"), + FAULT_DETECTION_TYPE("type"), + ST("st"), + ET("et"), + LAT("lat"), + ERROR("Error"); + + private final String fieldValue; + + Fields(String fieldValue) { + this.fieldValue = fieldValue; + } + + @Override + public String toString() { + return fieldValue; + } + + } + + public FaultDetectionMetricsSnapshot(Connection conn, Long windowStartTime) { + this.create = DSL.using(conn, SQLDialect.SQLITE); + this.windowStartTime = windowStartTime; + this.tableName = "fault_detection_" + windowStartTime; + + this.columns = + new ArrayList>() { + { + this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class)); + this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class)); + this.add(DSL.field(DSL.name(Fields.ERROR.toString()), Integer.class)); + } + }; + create.createTable(this.tableName).columns(columns).execute(); + } + + public BatchBindStep startBatchPut() { + + List dummyValues = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + dummyValues.add(null); + } + return create.batch(create.insertInto(DSL.table(this.tableName)).values(dummyValues)); + } + + @VisibleForTesting + public void putStartMetric(Long startTime, Map dimensions) { + Map, String> dimensionMap = new HashMap<>(); + for (Map.Entry dimension : dimensions.entrySet()) { + dimensionMap.put(DSL.field(DSL.name(dimension.getKey()), String.class), dimension.getValue()); + } + create + .insertInto(DSL.table(this.tableName)) + .set(DSL.field(DSL.name(Fields.ST.toString()), Long.class), startTime) + .set(dimensionMap) + .execute(); + } + + @VisibleForTesting + public void putEndMetric(Long endTime, int error, Map dimensions) { + Map, String> dimensionMap = new HashMap<>(); + for (Map.Entry dimension : dimensions.entrySet()) { + dimensionMap.put(DSL.field(DSL.name(dimension.getKey()), String.class), dimension.getValue()); + } + create + .insertInto(DSL.table(this.tableName)) + .set(DSL.field(DSL.name(Fields.ET.toString()), Long.class), endTime) + .set(DSL.field(DSL.name(Fields.ERROR.toString()), Integer.class), error) + .set(dimensionMap) + .execute(); + } + + public Result fetchAll() { + return create.select().from(DSL.table(this.tableName)).fetch(); + } + + @Override + public void remove() throws Exception { + create.dropTable(DSL.table(this.tableName)).execute(); + } + + public void rolloverInFlightRequests(FaultDetectionMetricsSnapshot prevSnap) { + // Fetch all entries that have not ended and write to current table. + create + .insertInto(DSL.table(this.tableName)) + .select(create.select().from(prevSnap.fetchInFlightRequests())) + .execute(); + } + + public SelectHavingStep fetchInFlightRequests() { + ArrayList> fields = + new ArrayList>() { + { + this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.ERROR.toString()), String.class)); + this.add(DSL.field(Fields.ST.toString(), Long.class)); + this.add(DSL.field(Fields.ET.toString(), Long.class)); + } + }; + + return create + .select(fields) + .from(groupByRidAndTypeSelect()) + .where( + DSL.field(Fields.ST.toString()) + .isNotNull() + .and(DSL.field(Fields.ET.toString()).isNull()) + .and(DSL.field(Fields.ST.toString()).gt(this.windowStartTime - EXPIRE_AFTER))); + } + + public SelectHavingStep groupByRidAndTypeSelect() { + ArrayList> fields = + new ArrayList>() { + { + this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); + } + }; + fields.add( + DSL.max(DSL.field(Fields.ST.toString(), Long.class)).as(DSL.name(Fields.ST.toString()))); + fields.add( + DSL.max(DSL.field(Fields.ET.toString(), Long.class)).as(DSL.name(Fields.ET.toString()))); + fields.add( + DSL.max(DSL.field(Fields.ERROR.toString(), Integer.class)).as(DSL.name(Fields.ERROR.toString()))); + LOG.error("Initial - " + fetchAll().size()); + LOG.error("Breakdown 1 - " + create + .select(fields) + .from(DSL.table(this.tableName)) + .groupBy(DSL.field(Fields.RID.toString()), DSL.field(Fields.FAULT_DETECTION_TYPE.toString())).fetch().size()); + return create + .select(fields) + .from(DSL.table(this.tableName)) + .groupBy(DSL.field(Fields.RID.toString()), DSL.field(Fields.FAULT_DETECTION_TYPE.toString())); + } + + public SelectHavingStep fetchLatencyTable() { + ArrayList> fields = + new ArrayList>() { + { + this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), Long.class)); + this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); + this.add(DSL.field(Fields.ST.toString(), Long.class)); + this.add(DSL.field(Fields.ET.toString(), Long.class)); + this.add(DSL.field(Fields.ERROR.toString(), Integer.class)); + } + }; + fields.add( + DSL.field(Fields.ET.toString()) + .minus(DSL.field(Fields.ST.toString())) + .as(DSL.name(Fields.LAT.toString()))); + LOG.error("Breakdown 2 - " + create + .select(fields) + .from(groupByRidAndTypeSelect()) + .where( + DSL.field(Fields.ET.toString()) + .isNotNull() + .and(DSL.field(Fields.ST.toString()).isNotNull())).fetch().size()); + return create + .select(fields) + .from(groupByRidAndTypeSelect()) + .where( + DSL.field(Fields.ET.toString()) + .isNotNull() + .and(DSL.field(Fields.ST.toString()).isNotNull())); + } + + public Result fetchAggregatedTable() { + ArrayList> fields = + new ArrayList>() { + { + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); + + this.add( + DSL.sum(DSL.field(DSL.name(Fields.LAT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.SUM))); + this.add( + DSL.avg(DSL.field(DSL.name(Fields.LAT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.AVG))); + this.add( + DSL.min(DSL.field(DSL.name(Fields.LAT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.MIN))); + this.add( + DSL.max(DSL.field(DSL.name(Fields.LAT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.MAX))); + + this.add( + DSL.sum(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.SUM))); + this.add( + DSL.avg(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.AVG))); + this.add( + DSL.min(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.MIN))); + this.add( + DSL.max(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.MAX))); + } + }; + ArrayList> groupByFields = + new ArrayList>() { + { + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); + } + }; + + return create.select(fields).from(fetchLatencyTable()).groupBy(groupByFields).fetch(); + } +} diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java index 7af1c2e86..78f7eb062 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java @@ -22,6 +22,7 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCInfoDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCInfoValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.OSMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.Dimensions; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.Metric; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; @@ -847,4 +848,141 @@ public static void emitNodeMetrics( "Total time taken for writing {} metrics metricsdb: {}", tableName, mFinalT - mCurrT); } } + + public static void emitFaultDetectionMetrics(DSLContext create, MetricsDB db, FaultDetectionMetricsSnapshot faultDetectionSnapshot) { + + long mCurrT = System.currentTimeMillis(); + Dimensions dimensions = new Dimensions(); + Result res = faultDetectionSnapshot.fetchAggregatedTable(); + LOG.error("AHH record size is " + res.size()); + List dims = + new ArrayList() { + { + this.add(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()); + this.add(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); + } + }; + + db.createMetric( + new Metric(AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), 0d), + dims); + + db.createMetric( + new Metric(AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(), 0d), + dims); + + db.createMetric( + new Metric(AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(), 0d), + dims); + + db.createMetric( + new Metric(AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(), 0d), + dims); + + for (Record r : res) { + dimensions.put( + AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString(), + r.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()).toString()); + dimensions.put( + AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString(), + r.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()).toString()); + + Double sumLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.SUM)) + .toString()); + Double avgLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.AVG)) + .toString()); + Double minLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.MIN)) + .toString()); + Double maxLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.MAX)) + .toString()); + + Double sumError = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), + MetricsDB.SUM)) + .toString()); + Double avgError = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), + MetricsDB.AVG)) + .toString()); + Double minError = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), + MetricsDB.MIN)) + .toString()); + Double maxError = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), + MetricsDB.MAX)) + .toString()); + if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString()).toString() + .equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_FOLLOWER_CHECK)) { + db.putMetric( + new Metric( + AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), + sumLatency, + avgLatency, + minLatency, + maxLatency), + dimensions, + 0); + db.putMetric( + new Metric( + AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(), + sumError, + avgError, + minError, + maxError), + dimensions, + 0); + } else if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString()).toString() + .equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_LEADER_CHECK)) { + db.putMetric( + new Metric( + AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(), + sumLatency, + avgLatency, + minLatency, + maxLatency), + dimensions, + 0); + db.putMetric( + new Metric( + AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(), + sumError, + avgError, + minError, + maxError), + dimensions, + 0); + } + } + long mFinalT = System.currentTimeMillis(); + LOG.debug("Total time taken for writing fault detection metrics to metricsdb: {}", mFinalT - mCurrT); + } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 82cf36ede..5629ffcfe 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -72,11 +72,13 @@ public class ReaderMetricsProcessor implements Runnable { private NavigableMap masterEventMetricsMap; private NavigableMap gcInfoMap; private Map> nodeMetricsMap; + private NavigableMap faultDetectionMetricsMap; private static final int MAX_DATABASES = 2; private static final int OS_SNAPSHOTS = 4; private static final int RQ_SNAPSHOTS = 4; private static final int HTTP_RQ_SNAPSHOTS = 4; private static final int MASTER_EVENT_SNAPSHOTS = 4; + private static final int FAULT_DETECTION_SNAPSHOTS = 2; private static final int GC_INFO_SNAPSHOTS = 4; private final String rootLocation; private static final Map TIMING_STATS = new HashMap<>(); @@ -119,6 +121,7 @@ public ReaderMetricsProcessor(String rootLocation, boolean processNewFormat, fin shardRqMetricsMap = new TreeMap<>(); httpRqMetricsMap = new TreeMap<>(); masterEventMetricsMap = new TreeMap<>(); + faultDetectionMetricsMap = new TreeMap<>(); gcInfoMap = new TreeMap<>(); this.rootLocation = rootLocation; this.configOverridesApplier = new ConfigOverridesApplier(); @@ -240,6 +243,7 @@ public void trimOldSnapshots() throws Exception { trimMap(shardRqMetricsMap, RQ_SNAPSHOTS); trimMap(httpRqMetricsMap, HTTP_RQ_SNAPSHOTS); trimMap(masterEventMetricsMap, MASTER_EVENT_SNAPSHOTS); + trimMap(faultDetectionMetricsMap, FAULT_DETECTION_SNAPSHOTS); trimMap(gcInfoMap, GC_INFO_SNAPSHOTS); for (NavigableMap snap : nodeMetricsMap.values()) { @@ -353,6 +357,7 @@ private void emitMetrics(long currWindowStartTime) throws Exception { emitShardRequestMetrics(prevWindowStartTime, alignedOSSnapHolder, osAlignedSnap, metricsDB); emitHttpRequestMetrics(prevWindowStartTime, metricsDB); emitNodeMetrics(currWindowStartTime, metricsDB); + emitFaultDetectionMetrics(prevWindowStartTime, metricsDB); metricsDB.commit(); metricsDBMap.put(prevWindowStartTime, metricsDB); @@ -363,6 +368,17 @@ private void emitMetrics(long currWindowStartTime) throws Exception { LOG.debug("Total time taken for emitting Metrics: {}", mFinalT - mCurrT); TIMING_STATS.put("emitMetrics", (double) (mFinalT - mCurrT)); } + + private void emitFaultDetectionMetrics(long prevWindowStartTime, MetricsDB metricsDB) { + if (faultDetectionMetricsMap.containsKey(prevWindowStartTime)) { + + FaultDetectionMetricsSnapshot prevFaultDetectionSnap = faultDetectionMetricsMap.get(prevWindowStartTime); + MetricsEmitter.emitFaultDetectionMetrics(create, metricsDB, prevFaultDetectionSnap); + } else { + LOG.debug( + "Http request snapshot for the previous window does not exist. Not emitting metrics."); + } + } private void emitGarbageCollectionInfo(long prevWindowStartTime, MetricsDB metricsDB) throws Exception { if (gcInfoMap.containsKey(prevWindowStartTime)) { @@ -513,6 +529,9 @@ is ready so it starts to read that file (go back two windows and EventProcessor httpProcessor = HttpRequestEventProcessor.buildHttpRequestMetricEventsProcessor( currWindowStartTime, currWindowEndTime, conn, httpRqMetricsMap); + EventProcessor faultDetectionProcessor = + FaultDetectionMetricsProcessor.buildFaultDetectionMetricsProcessor( + currWindowStartTime, conn, faultDetectionMetricsMap); EventProcessor masterEventsProcessor = MasterMetricsEventProcessor.buildMasterMetricEventsProcessor( currWindowStartTime, conn, masterEventMetricsMap); @@ -540,6 +559,7 @@ is ready so it starts to read that file (go back two windows and eventDispatcher.registerEventProcessor(nodeEventsProcessor); eventDispatcher.registerEventProcessor(masterEventsProcessor); eventDispatcher.registerEventProcessor(clusterDetailsEventsProcessor); + eventDispatcher.registerEventProcessor(faultDetectionProcessor); eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor); eventDispatcher.initializeProcessing( diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java new file mode 100644 index 000000000..abb7fd04d --- /dev/null +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java @@ -0,0 +1,52 @@ +package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; + +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import org.jooq.BatchBindStep; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; + +import static org.junit.Assert.assertEquals; + +public class FaultDetectionMetricsSnapshotTests { + private static final String DB_URL = "jdbc:sqlite:"; + private Connection conn; + + @Before + public void setup() throws Exception { + Class.forName("org.sqlite.JDBC"); + System.setProperty("java.io.tmpdir", "/tmp"); + conn = DriverManager.getConnection(DB_URL); + } + + @Test + public void testPutMetrics() { + FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot = + new FaultDetectionMetricsSnapshot(conn, 1535065195000L); + BatchBindStep handle = faultDetectionMetricsSnapshot.startBatchPut(); + + handle.bind("1", "sourceNode", "targetNodeId", "follower_check",1535065195000L, null, 0); + handle.bind("1", "sourceNode", "targetNodeId", "follower_check", null, 1535065195050L, 0); + handle.execute(); + Result rt = faultDetectionMetricsSnapshot.fetchAggregatedTable(); + + assertEquals(1, rt.size()); + Double latency = Double.parseDouble(rt.get(0).get("sum_" + FaultDetectionMetricsSnapshot.Fields.LAT.toString()).toString()); + assertEquals(50d, latency.doubleValue(), 0); + assertEquals( + "sourceNode", rt.get(0).get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString())); + assertEquals( + "targetNodeId", + rt.get(0).get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString())); + assertEquals( + "follower_check", + rt.get(0).get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString())); + assertEquals( + 0, + Integer.parseInt(rt.get(0).get("sum_" + FaultDetectionMetricsSnapshot.Fields.ERROR.toString()).toString())); + } +} diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java index da4cd5897..c41e61d82 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.TroubleshootingConfig; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCInfoDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCInfoValue; @@ -374,6 +375,43 @@ public void testEmitNodeMetrics() throws Exception { } db.remove(); } + + @Test + public void testFaultDetectionMetricsEmitter() throws Exception { + Connection conn = DriverManager.getConnection(DB_URL); + FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot = new FaultDetectionMetricsSnapshot(conn, 1L); + Map dimensions = new HashMap<>(); + dimensions.put(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString(), "sourceNodeId"); + dimensions.put(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString(), "targetNodeId"); + dimensions.put(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString(), "follower_check"); + dimensions.put(FaultDetectionMetricsSnapshot.Fields.RID.toString(), "1"); + faultDetectionMetricsSnapshot.putStartMetric(12345L, dimensions); + faultDetectionMetricsSnapshot.putEndMetric(33325L, 0, dimensions); + + dimensions.put(FaultDetectionMetricsSnapshot.Fields.RID.toString(), "2"); + faultDetectionMetricsSnapshot.putStartMetric(22245L, dimensions); + + dimensions.put(FaultDetectionMetricsSnapshot.Fields.RID.toString(), "3"); + faultDetectionMetricsSnapshot.putStartMetric(10000L, dimensions); + faultDetectionMetricsSnapshot.putEndMetric(30000L, 1, dimensions); + + DSLContext create = DSL.using(conn, SQLDialect.SQLITE); + MetricsDB db = new MetricsDB(1553713438); + MetricsEmitter.emitFaultDetectionMetrics(create, db, faultDetectionMetricsSnapshot); + Result res = + db.queryMetric( + Arrays.asList( + AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), + AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString()), + Arrays.asList("avg", "sum"), + Arrays.asList(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString())); + + Float latency = Float.parseFloat(res.get(0).get(AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString()) + .toString()); + db.remove(); + assertEquals(20490.0f, latency.floatValue(), 0); + } +} @Test public void testEmitGCTypeMetric() throws Exception { diff --git a/src/test/resources/reader/1566413960000 b/src/test/resources/reader/1566413960000 index ce404a79f..78f8a4279 100644 --- a/src/test/resources/reader/1566413960000 +++ b/src/test/resources/reader/1566413960000 @@ -62,6 +62,32 @@ $ {"MemType":"NonHeap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":260165632,"Heap_Init":2555904,"Heap_Max":-1,"Heap_Used":248759720} {"MemType":"Heap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":17145004032,"Heap_Init":17179869184,"Heap_Max":17145004032,"Heap_Used":5991469464} $ +^fault_detection/follower_check/7627/start +current_time:1566413936500 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +StartTime:1566413988986 +$ +^fault_detection/follower_check/7627/finish +current_time:1566413936550 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +FinishTime:1566413989986 +error:0 +$ +^fault_detection/follower_check/7649/start +current_time:1566413936507 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +StartTime:1566413987786 +$ +^fault_detection/follower_check/7649/finish +current_time:1566413936559 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +FinishTime:1566413987986 +error:0 +$ ^indices/nyc_taxis_2/27 {"current_time":1566413936500} {"Indexing_ThrottleTime":0,"Cache_Query_Hit":8,"Cache_Query_Miss":0,"Cache_Query_Size":483027,"Cache_FieldData_Eviction":0,"Cache_FieldData_Size":0,"Cache_Request_Hit":0,"Cache_Request_Miss":0,"Cache_Request_Eviction":0,"Cache_Request_Size":0,"Refresh_Event":0,"Refresh_Time":0,"Flush_Event":0,"Flush_Time":0,"Merge_Event":0,"Merge_Time":0,"Merge_CurrentEvent":0,"Indexing_Buffer":0,"Segments_Total":21,"Segments_Memory":1508419,"Terms_Memory":1066993,"StoredFields_Memory":116608,"TermVectors_Memory":1066993,"Norms_Memory":0,"Points_Memory":276918,"DocValues_Memory":47900,"IndexWriter_Memory":0,"VersionMap_Memory":0,"Bitset_Memory":0}$ diff --git a/src/test/resources/reader/1566413965000 b/src/test/resources/reader/1566413965000 index 8a5a49fde..cede49ff5 100644 --- a/src/test/resources/reader/1566413965000 +++ b/src/test/resources/reader/1566413965000 @@ -61,6 +61,32 @@ $ {"MemType":"NonHeap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":260427776,"Heap_Init":2555904,"Heap_Max":-1,"Heap_Used":249123672} {"MemType":"Heap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":17145004032,"Heap_Init":17179869184,"Heap_Max":17145004032,"Heap_Used":5973041688} $ +^fault_detection/follower_check/7627/start +current_time:1566413966497 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +StartTime:1566413988986 +$ +^fault_detection/follower_check/7627/finish +current_time:1566413966497 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +FinishTime:1566413989986 +error:0 +$ +^fault_detection/follower_check/7649/start +current_time:1566413966497 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +StartTime:1566413987786 +$ +^fault_detection/follower_check/7649/finish +current_time:1566413966497 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +FinishTime:1566413987986 +error:0 +$ ^indices/nyc_taxis/27 {"current_time":1566413966497} {"Indexing_ThrottleTime":0,"Cache_Query_Hit":0,"Cache_Query_Miss":0,"Cache_Query_Size":0,"Cache_FieldData_Eviction":0,"Cache_FieldData_Size":0,"Cache_Request_Hit":0,"Cache_Request_Miss":0,"Cache_Request_Eviction":0,"Cache_Request_Size":0,"Refresh_Event":2,"Refresh_Time":0,"Flush_Event":0,"Flush_Time":0,"Merge_Event":0,"Merge_Time":0,"Merge_CurrentEvent":0,"Indexing_Buffer":0,"Segments_Total":0,"Segments_Memory":0,"Terms_Memory":0,"StoredFields_Memory":0,"TermVectors_Memory":0,"Norms_Memory":0,"Points_Memory":0,"DocValues_Memory":0,"IndexWriter_Memory":0,"VersionMap_Memory":0,"Bitset_Memory":0}$ diff --git a/src/test/resources/reader/1566413970000 b/src/test/resources/reader/1566413970000 index 419e000a5..c6214cd34 100644 --- a/src/test/resources/reader/1566413970000 +++ b/src/test/resources/reader/1566413970000 @@ -62,6 +62,32 @@ $ {"MemType":"NonHeap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":260427776,"Heap_Init":2555904,"Heap_Max":-1,"Heap_Used":249158360} {"MemType":"Heap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":17145004032,"Heap_Init":17179869184,"Heap_Max":17145004032,"Heap_Used":8569832928} $ +^fault_detection/follower_check/7627/start +current_time:1566413996768 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +StartTime:1566413988986 +$ +^fault_detection/follower_check/7627/finish +current_time:1566413996768 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +FinishTime:1566413989986 +error:0 +$ +^fault_detection/follower_check/7649/start +current_time:1566413996768 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +StartTime:1566413987786 +$ +^fault_detection/follower_check/7649/finish +current_time:1566413996768 +SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a +TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a +FinishTime:1566413987986 +error:0 +$ ^indices/nyc_taxis_1/27 {"current_time":1566413996768} {"Indexing_ThrottleTime":0,"Cache_Query_Hit":0,"Cache_Query_Miss":0,"Cache_Query_Size":0,"Cache_FieldData_Eviction":0,"Cache_FieldData_Size":0,"Cache_Request_Hit":0,"Cache_Request_Miss":0,"Cache_Request_Eviction":0,"Cache_Request_Size":0,"Refresh_Event":0,"Refresh_Time":0,"Flush_Event":0,"Flush_Time":0,"Merge_Event":0,"Merge_Time":0,"Merge_CurrentEvent":0,"Indexing_Buffer":11114952,"Segments_Total":0,"Segments_Memory":0,"Terms_Memory":0,"StoredFields_Memory":0,"TermVectors_Memory":0,"Norms_Memory":0,"Points_Memory":0,"DocValues_Memory":0,"IndexWriter_Memory":11114952,"VersionMap_Memory":0,"Bitset_Memory":0}$ From 867752f4de0fa7a9c1a14b21b401e0a0d808d90a Mon Sep 17 00:00:00 2001 From: Arpita Date: Wed, 14 Oct 2020 16:27:27 +0530 Subject: [PATCH 2/4] Publish Fault Detection Metrics --- .../reader/FaultDetectionMetricsProcessor.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java index 015ad62e3..4712ed0f3 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java @@ -62,15 +62,10 @@ public void finalizeProcessing() { public void processEvent(Event event) { String[] keyItems = event.key.split(File.separatorChar == '\\' ? "\\\\" : File.separator); - for (String key:keyItems) { - LOG.error("Hello Key - " + key); - } if (keyItems[0].equals(PerformanceAnalyzerMetrics.sFaultDetection)) { - LOG.error("It is starting"); if (keyItems[3].equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) { emitStartMetric(event, keyItems); } else if (keyItems[3].equals(PerformanceAnalyzerMetrics.FINISH_FILE_NAME)) { - LOG.error("It is finishing"); emitFinishMetric(event, keyItems); } } @@ -104,9 +99,6 @@ private void emitStartMetric(Event entry, String[] keyItems) { String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); String startTimeVal = keyValueMap.get(AllMetrics.CommonMetric.START_TIME.toString()); String fault_detection_type = keyItems[1]; - - LOG.error("You know what the values are - " + sourceNodeId + " " + targetNodeId + " " + startTimeVal + fault_detection_type); - try { long st = Long.parseLong(startTimeVal); // A keyItem is of the form : [fault_detection, follower_check, 76543, start] @@ -135,9 +127,6 @@ private void emitFinishMetric(Event entry, String[] keyItems) { String finishTimeVal = keyValueMap.get(AllMetrics.CommonMetric.FINISH_TIME.toString()); String errorString = keyValueMap.get(PerformanceAnalyzerMetrics.ERROR); String fault_detection_type = keyItems[1]; - - LOG.error("Lets see finish values - " + sourceNodeId + " " + targetNodeId + " " + finishTimeVal + " " + errorString); - try { long et = Long.parseLong(finishTimeVal); int error = Integer.parseInt(errorString); From d1f587f4950141fe228fcc86c97ab3e46523e14e Mon Sep 17 00:00:00 2001 From: Arpita Date: Wed, 14 Oct 2020 16:27:27 +0530 Subject: [PATCH 3/4] Publish Fault Detection Metrics --- .../reader/FaultDetectionMetricsProcessor.java | 6 +++--- .../performanceanalyzer/reader/ReaderMetricsProcessor.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java index 4712ed0f3..bf6aa871f 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java @@ -61,7 +61,6 @@ public void finalizeProcessing() { @Override public void processEvent(Event event) { String[] keyItems = event.key.split(File.separatorChar == '\\' ? "\\\\" : File.separator); - if (keyItems[0].equals(PerformanceAnalyzerMetrics.sFaultDetection)) { if (keyItems[3].equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) { emitStartMetric(event, keyItems); @@ -111,13 +110,14 @@ private void emitStartMetric(Event entry, String[] keyItems) { } } - // A keyItem is of the form : [threads, http, bulk, 43369, start] + // A keyItem is of the form : [fault_detection, follower_check, 76532, finish] // // Example value part of the entry is: // current_time:1566413979979 - // StartTime:1566413987986 + // FinishTime:1566413987986 // SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a // TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a + // Error:0 // $ private void emitFinishMetric(Event entry, String[] keyItems) { Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 5629ffcfe..c0cb9104b 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -376,7 +376,7 @@ private void emitFaultDetectionMetrics(long prevWindowStartTime, MetricsDB metri MetricsEmitter.emitFaultDetectionMetrics(create, metricsDB, prevFaultDetectionSnap); } else { LOG.debug( - "Http request snapshot for the previous window does not exist. Not emitting metrics."); + "Fault Detection snapshot for the previous window does not exist. Not emitting metrics."); } } From e01f059f96038d2cab40090cab653175271e9d7f Mon Sep 17 00:00:00 2001 From: Arpita Date: Wed, 21 Oct 2020 23:16:36 +0530 Subject: [PATCH 4/4] Publish Fault Detection Metrics --- .../metrics/AllMetrics.java | 32 ++- .../metrics/PerformanceAnalyzerMetrics.java | 2 +- .../model/MetricsModel.java | 8 +- .../metrics/ExceptionsAndErrors.java | 4 +- .../rca/framework/metrics/ReaderMetrics.java | 4 +- .../rca/framework/metrics/WriterMetrics.java | 5 +- .../FaultDetectionMetricsProcessor.java | 95 ++++--- .../reader/FaultDetectionMetricsSnapshot.java | 83 +++--- .../reader/MetricsEmitter.java | 261 +++++++++--------- .../reader/ReaderMetricsProcessor.java | 8 +- .../FaultDetectionMetricsSnapshotTests.java | 26 +- .../reader/MetricsEmitterTests.java | 9 +- src/test/resources/reader/1566413960000 | 22 +- src/test/resources/reader/1566413965000 | 20 +- src/test/resources/reader/1566413970000 | 20 +- 15 files changed, 329 insertions(+), 270 deletions(-) diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java index 5573fc0d4..aad4d465d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/AllMetrics.java @@ -1056,12 +1056,28 @@ public static class Constants { public static final String SHARD_OP_COUNT_VALUE = "ShardEvents"; } } + /* + * column names of FollowerCheck_Latency table + * SourceNodeId | TargetNodeID | sum | avg | min |max + * + * column names of LeaderCheck_Latency table + * SourceNodeId | TargetNodeID | sum | avg | min |max + * + * column names of FollowerCheck_Failure table + * SourceNodeId | TargetNodeID | sum | avg | min |max + * + * column names of LeaderCheck_Failure table + * SourceNodeId | TargetNodeID | sum | avg | min |max + * + *

Example: + * chMe07whRwGrOAqyLTP9vw|hgi7an4RwGrOAqyLTP9vw|1.0|0.2|0.0|1.0 + */ public enum FaultDetectionMetric implements MetricValue { - LATENCY_FOLLOWER_CHECK(Constants.LATENCY_FOLLOWER_CHECK), - LATENCY_LEADER_CHECK(Constants.LATENCY_LEADER_CHECK), - FAILURE_FOLLOWER_CHECK(Constants.FAILURE_FOLLOWER_CHECK), - FAILURE_LEADER_CHECK(Constants.FAILURE_LEADER_CHECK); + FOLLOWER_CHECK_LATENCY(Constants.FOLLOWER_CHECK_LATENCY), + LEADER_CHECK_LATENCY(Constants.LEADER_CHECK_LATENCY), + FOLLOWER_CHECK_FAILURE(Constants.FOLLOWER_CHECK_FAILURE), + LEADER_CHECK_FAILURE(Constants.LEADER_CHECK_FAILURE); private final String value; @@ -1075,10 +1091,10 @@ public String toString() { } public static class Constants { - public static final String LATENCY_FOLLOWER_CHECK = "Latency_FollowerCheck"; - public static final String LATENCY_LEADER_CHECK = "Latency_LeaderCheck"; - public static final String FAILURE_FOLLOWER_CHECK = "Failure_FollowerCheck"; - public static final String FAILURE_LEADER_CHECK = "Failure_LeaderCheck"; + public static final String FOLLOWER_CHECK_LATENCY = "FollowerCheck_Latency"; + public static final String LEADER_CHECK_LATENCY = "LeaderCheck_Latency"; + public static final String FOLLOWER_CHECK_FAILURE = "FollowerCheck_Failure"; + public static final String LEADER_CHECK_FAILURE = "LeaderCheck_Failure"; } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java index 3786861f5..3feb2cd2d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/metrics/PerformanceAnalyzerMetrics.java @@ -65,7 +65,7 @@ public class PerformanceAnalyzerMetrics { public static final String METRIC_CURRENT_TIME = "current_time"; public static final String FAULT_DETECTION_FOLLOWER_CHECK = "follower_check"; public static final String FAULT_DETECTION_LEADER_CHECK = "leader_check"; - public static final String ERROR = "error"; + public static final String FAULT = "fault"; public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize(); // TODO: Comeup with a more sensible number. diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java index 167d362a7..f899dd81d 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/model/MetricsModel.java @@ -341,22 +341,22 @@ public class MetricsModel { MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values())); allMetricsInitializer.put( - AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), + AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(), new MetricAttributes( MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values())); allMetricsInitializer.put( - AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(), + AllMetrics.FaultDetectionMetric.LEADER_CHECK_LATENCY.toString(), new MetricAttributes( MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values())); allMetricsInitializer.put( - AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(), + AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString(), new MetricAttributes( MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values())); allMetricsInitializer.put( - AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(), + AllMetrics.FaultDetectionMetric.LEADER_CHECK_FAILURE.toString(), new MetricAttributes( MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values())); diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java index aa27bf81b..55100a0ae 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ExceptionsAndErrors.java @@ -51,7 +51,9 @@ public enum ExceptionsAndErrors implements MeasurementSet { SHARD_STATE_COLLECTOR_ERROR("ShardStateCollectorError"), - MASTER_THROTTLING_COLLECTOR_ERROR("MasterThrottlingMetricsCollector"); + MASTER_THROTTLING_COLLECTOR_ERROR("MasterThrottlingMetricsCollector"), + + FAULT_DETECTION_COLLECTOR_ERROR("FaultDetectionMetricsCollector"); /** What we want to appear as the metric name. */ private String name; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java index 0cff09434..056bd3a61 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java @@ -67,8 +67,10 @@ public enum ReaderMetrics implements MeasurementSet { * Amount of time taken to emit Master throttling metrics. */ MASTER_THROTTLING_EMITTER_EXECUTION_TIME("MasterThrottlingEmitterExecutionTime", "millis", - Arrays.asList(Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)); + Arrays.asList(Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)), + FAULT_DETECTION_METRICS_EMITTER_EXECUTION_TIME("FaultDetectionMetricsEmitterExecutionTime", "millis", + Arrays.asList(Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)); /** What we want to appear as the metric name. */ private String name; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java index 11c542bf9..6277c3861 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/rca/framework/metrics/WriterMetrics.java @@ -30,7 +30,10 @@ public enum WriterMetrics implements MeasurementSet { Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)), MASTER_THROTTLING_COLLECTOR_NOT_AVAILABLE("MasterThrottlingCollectorNotAvailable", "count", Arrays.asList( - Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)); + Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)), + + FAULT_DETECTION_COLLECTOR_EXECUTION_TIME("FaultDetectionCollectorExecutionTime", "millis", Arrays.asList( + Statistics.MAX, Statistics.MIN, Statistics.MEAN, Statistics.COUNT, Statistics.SUM)); /** What we want to appear as the metric name. */ private String name; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java index bf6aa871f..8e1351909 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsProcessor.java @@ -1,8 +1,24 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode; import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.FaultDetectionDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; import java.io.File; @@ -28,7 +44,9 @@ public FaultDetectionMetricsProcessor(FaultDetectionMetricsSnapshot faultDetecti static FaultDetectionMetricsProcessor buildFaultDetectionMetricsProcessor( long currWindowStartTime, Connection conn, - NavigableMap faultDetectionMetricsMap) { + NavigableMap + faultDetectionMetricsMap) { + if (faultDetectionMetricsMap.get(currWindowStartTime) == null) { FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot = new FaultDetectionMetricsSnapshot(conn, currWindowStartTime); @@ -61,6 +79,7 @@ public void finalizeProcessing() { @Override public void processEvent(Event event) { String[] keyItems = event.key.split(File.separatorChar == '\\' ? "\\\\" : File.separator); + assert keyItems.length == 4; if (keyItems[0].equals(PerformanceAnalyzerMetrics.sFaultDetection)) { if (keyItems[3].equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) { emitStartMetric(event, keyItems); @@ -83,25 +102,30 @@ public void commitBatchIfRequired() { } } - // A keyItem is of the form : [fault_detection, follower_check, 76532, start] - // - // Example value part of the entry is: - // current_time:1566413979979 - // StartTime:1566413987986 - // SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a - // TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a - // $ + /** + * A keyItem is of the form : [fault_detection, follower_check, 76532, start] + * Example value part of the entry is: + * current_time:1566413979979 + * StartTime:1566413987986 + * SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a + * TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a + * $ + * @param entry fault detection event. + * @param keyItems keys extracted from metrics path + */ private void emitStartMetric(Event entry, String[] keyItems) { Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value); - String sourceNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()); - String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); - String startTimeVal = keyValueMap.get(AllMetrics.CommonMetric.START_TIME.toString()); - String fault_detection_type = keyItems[1]; + String sourceNodeId = keyValueMap.get(FaultDetectionDimension.SOURCE_NODE_ID.toString()); + String targetNodeId = keyValueMap.get(FaultDetectionDimension.TARGET_NODE_ID.toString()); + String startTimeVal = keyValueMap.get(CommonMetric.START_TIME.toString()); + try { long st = Long.parseLong(startTimeVal); - // A keyItem is of the form : [fault_detection, follower_check, 76543, start] + + String fault_detection_type = keyItems[1]; String rid = keyItems[2]; + // A keyItem is of the form : [fault_detection, follower_check, 76543, start] handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, st, null, 0); } catch (NumberFormatException e) { LOG.error("Unable to parse string. StartTime:{}", startTimeVal); @@ -110,31 +134,36 @@ private void emitStartMetric(Event entry, String[] keyItems) { } } - // A keyItem is of the form : [fault_detection, follower_check, 76532, finish] - // - // Example value part of the entry is: - // current_time:1566413979979 - // FinishTime:1566413987986 - // SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a - // TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a - // Error:0 - // $ + /** + * A keyItem is of the form : [fault_detection, follower_check, 76532, start] + * Example value part of the entry is: + * current_time:1566413979979 + * FinishTime:1566413987986 + * SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a + * TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a + * fault:0 + * $ + * @param entry fault detection event. + * @param keyItems keys extracted from metrics path + */ private void emitFinishMetric(Event entry, String[] keyItems) { Map keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value); - String sourceNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()); - String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); - String finishTimeVal = keyValueMap.get(AllMetrics.CommonMetric.FINISH_TIME.toString()); - String errorString = keyValueMap.get(PerformanceAnalyzerMetrics.ERROR); - String fault_detection_type = keyItems[1]; + String sourceNodeId = keyValueMap.get(FaultDetectionDimension.SOURCE_NODE_ID.toString()); + String targetNodeId = keyValueMap.get(FaultDetectionDimension.TARGET_NODE_ID.toString()); + String finishTimeVal = keyValueMap.get(CommonMetric.FINISH_TIME.toString()); + String faultString = keyValueMap.get(PerformanceAnalyzerMetrics.FAULT); + try { long et = Long.parseLong(finishTimeVal); - int error = Integer.parseInt(errorString); - // A keyItem is of the form : [fault_detection, follower_check, 76543, finish] + int fault = Integer.parseInt(faultString); + + String fault_detection_type = keyItems[1]; String rid = keyItems[2]; - handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, null, et, error); + // A keyItem is of the form : [fault_detection, follower_check, 76543, finish] + handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, null, et, fault); } catch (NumberFormatException e) { - LOG.error("Unable to parse string. StartTime:{}, Error:{}", finishTimeVal, errorString); + LOG.error("Unable to parse string. StartTime:{}, Error:{}", finishTimeVal, faultString); StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR); throw e; } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java index db5c41065..bc5015a81 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshot.java @@ -1,15 +1,30 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; import com.amazon.opendistro.elasticsearch.performanceanalyzer.DBUtils; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.FaultDetectionDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB; +import com.google.common.annotations.VisibleForTesting; import java.sql.Connection; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.BatchBindStep; @@ -36,7 +51,7 @@ public enum Fields { ST("st"), ET("et"), LAT("lat"), - ERROR("Error"); + FAULT("fault"); private final String fieldValue; @@ -60,12 +75,12 @@ public FaultDetectionMetricsSnapshot(Connection conn, Long windowStartTime) { new ArrayList>() { { this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); this.add(DSL.field(DSL.name(Fields.ST.toString()), Long.class)); this.add(DSL.field(DSL.name(Fields.ET.toString()), Long.class)); - this.add(DSL.field(DSL.name(Fields.ERROR.toString()), Integer.class)); + this.add(DSL.field(DSL.name(Fields.FAULT.toString()), Integer.class)); } }; create.createTable(this.tableName).columns(columns).execute(); @@ -102,7 +117,7 @@ public void putEndMetric(Long endTime, int error, Map dimensions create .insertInto(DSL.table(this.tableName)) .set(DSL.field(DSL.name(Fields.ET.toString()), Long.class), endTime) - .set(DSL.field(DSL.name(Fields.ERROR.toString()), Integer.class), error) + .set(DSL.field(DSL.name(Fields.FAULT.toString()), Integer.class), error) .set(dimensionMap) .execute(); } @@ -129,10 +144,10 @@ public SelectHavingStep fetchInFlightRequests() { new ArrayList>() { { this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); - this.add(DSL.field(DSL.name(Fields.ERROR.toString()), String.class)); + this.add(DSL.field(DSL.name(Fields.FAULT.toString()), String.class)); this.add(DSL.field(Fields.ST.toString(), Long.class)); this.add(DSL.field(Fields.ET.toString(), Long.class)); } @@ -153,8 +168,8 @@ public SelectHavingStep groupByRidAndTypeSelect() { new ArrayList>() { { this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); } }; @@ -163,12 +178,7 @@ public SelectHavingStep groupByRidAndTypeSelect() { fields.add( DSL.max(DSL.field(Fields.ET.toString(), Long.class)).as(DSL.name(Fields.ET.toString()))); fields.add( - DSL.max(DSL.field(Fields.ERROR.toString(), Integer.class)).as(DSL.name(Fields.ERROR.toString()))); - LOG.error("Initial - " + fetchAll().size()); - LOG.error("Breakdown 1 - " + create - .select(fields) - .from(DSL.table(this.tableName)) - .groupBy(DSL.field(Fields.RID.toString()), DSL.field(Fields.FAULT_DETECTION_TYPE.toString())).fetch().size()); + DSL.max(DSL.field(Fields.FAULT.toString(), Integer.class)).as(DSL.name(Fields.FAULT.toString()))); return create .select(fields) .from(DSL.table(this.tableName)) @@ -180,25 +190,18 @@ public SelectHavingStep fetchLatencyTable() { new ArrayList>() { { this.add(DSL.field(DSL.name(Fields.RID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), Long.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), Long.class)); this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); this.add(DSL.field(Fields.ST.toString(), Long.class)); this.add(DSL.field(Fields.ET.toString(), Long.class)); - this.add(DSL.field(Fields.ERROR.toString(), Integer.class)); + this.add(DSL.field(Fields.FAULT.toString(), Integer.class)); } }; fields.add( DSL.field(Fields.ET.toString()) .minus(DSL.field(Fields.ST.toString())) .as(DSL.name(Fields.LAT.toString()))); - LOG.error("Breakdown 2 - " + create - .select(fields) - .from(groupByRidAndTypeSelect()) - .where( - DSL.field(Fields.ET.toString()) - .isNotNull() - .and(DSL.field(Fields.ST.toString()).isNotNull())).fetch().size()); return create .select(fields) .from(groupByRidAndTypeSelect()) @@ -212,8 +215,8 @@ public Result fetchAggregatedTable() { ArrayList> fields = new ArrayList>() { { - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); this.add( @@ -230,24 +233,24 @@ public Result fetchAggregatedTable() { .as(DBUtils.getAggFieldName(Fields.LAT.toString(), MetricsDB.MAX))); this.add( - DSL.sum(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) - .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.SUM))); + DSL.sum(DSL.field(DSL.name(Fields.FAULT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.FAULT.toString(), MetricsDB.SUM))); this.add( - DSL.avg(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) - .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.AVG))); + DSL.avg(DSL.field(DSL.name(Fields.FAULT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.FAULT.toString(), MetricsDB.AVG))); this.add( - DSL.min(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) - .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.MIN))); + DSL.min(DSL.field(DSL.name(Fields.FAULT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.FAULT.toString(), MetricsDB.MIN))); this.add( - DSL.max(DSL.field(DSL.name(Fields.ERROR.toString()), Double.class)) - .as(DBUtils.getAggFieldName(Fields.ERROR.toString(), MetricsDB.MAX))); + DSL.max(DSL.field(DSL.name(Fields.FAULT.toString()), Double.class)) + .as(DBUtils.getAggFieldName(Fields.FAULT.toString(), MetricsDB.MAX))); } }; ArrayList> groupByFields = new ArrayList>() { { - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); - this.add(DSL.field(DSL.name(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.SOURCE_NODE_ID.toString()), String.class)); + this.add(DSL.field(DSL.name(FaultDetectionDimension.TARGET_NODE_ID.toString()), String.class)); this.add(DSL.field(DSL.name(Fields.FAULT_DETECTION_TYPE.toString()), String.class)); } }; diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java index a27639a95..ed9f662d5 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitter.java @@ -20,6 +20,8 @@ import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.TroubleshootingConfig; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonMetric; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.FaultDetectionDimension; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.FaultDetectionMetric; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCInfoDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.GCInfoValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.OSMetrics; @@ -103,6 +105,14 @@ public class MetricsEmitter { } }; + private static final List FAULT_DETECTION_TABLE_DIMENSIONS = + new ArrayList() { + { + this.add(FaultDetectionDimension.SOURCE_NODE_ID.toString()); + this.add(FaultDetectionDimension.TARGET_NODE_ID.toString()); + } + }; + public static void emitAggregatedOSMetrics( final DSLContext create, final MetricsDB db, @@ -864,142 +874,135 @@ public static void emitNodeMetrics( } } - public static void emitFaultDetectionMetrics(DSLContext create, MetricsDB db, FaultDetectionMetricsSnapshot faultDetectionSnapshot) { + public static void emitFaultDetectionMetrics(MetricsDB db, FaultDetectionMetricsSnapshot faultDetectionSnapshot) { - long mCurrT = System.currentTimeMillis(); - Dimensions dimensions = new Dimensions(); - Result res = faultDetectionSnapshot.fetchAggregatedTable(); - LOG.error("AHH record size is " + res.size()); - List dims = - new ArrayList() { - { - this.add(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()); - this.add(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()); - } - }; + long mCurrT = System.currentTimeMillis(); + Dimensions dimensions = new Dimensions(); + Result res = faultDetectionSnapshot.fetchAggregatedTable(); - db.createMetric( - new Metric(AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), 0d), - dims); + db.createMetric( + new Metric(FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(), 0d), + FAULT_DETECTION_TABLE_DIMENSIONS); - db.createMetric( - new Metric(AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(), 0d), - dims); + db.createMetric( + new Metric(FaultDetectionMetric.LEADER_CHECK_LATENCY.toString(), 0d), + FAULT_DETECTION_TABLE_DIMENSIONS); - db.createMetric( - new Metric(AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(), 0d), - dims); + db.createMetric( + new Metric(FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString(), 0d), + FAULT_DETECTION_TABLE_DIMENSIONS); - db.createMetric( - new Metric(AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(), 0d), - dims); - - for (Record r : res) { - dimensions.put( - AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString(), - r.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()).toString()); - dimensions.put( - AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString(), - r.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()).toString()); - - Double sumLatency = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.SUM)) - .toString()); - Double avgLatency = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.AVG)) - .toString()); - Double minLatency = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.MIN)) - .toString()); - Double maxLatency = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.MAX)) - .toString()); - - Double sumError = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), - MetricsDB.SUM)) - .toString()); - Double avgError = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), - MetricsDB.AVG)) - .toString()); - Double minError = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), - MetricsDB.MIN)) - .toString()); - Double maxError = - Double.parseDouble( - r.get( - DBUtils.getAggFieldName( - FaultDetectionMetricsSnapshot.Fields.ERROR.toString(), - MetricsDB.MAX)) - .toString()); - if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString()).toString() - .equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_FOLLOWER_CHECK)) { - db.putMetric( - new Metric( - AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), - sumLatency, - avgLatency, - minLatency, - maxLatency), - dimensions, - 0); - db.putMetric( - new Metric( - AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(), - sumError, - avgError, - minError, - maxError), - dimensions, - 0); - } else if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString()).toString() - .equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_LEADER_CHECK)) { - db.putMetric( - new Metric( - AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(), - sumLatency, - avgLatency, - minLatency, - maxLatency), - dimensions, - 0); - db.putMetric( - new Metric( - AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(), - sumError, - avgError, - minError, - maxError), - dimensions, - 0); + db.createMetric( + new Metric(FaultDetectionMetric.LEADER_CHECK_FAILURE.toString(), 0d), + FAULT_DETECTION_TABLE_DIMENSIONS); + for (Record r : res) { + dimensions.put( + FaultDetectionDimension.SOURCE_NODE_ID.toString(), + r.get(FaultDetectionDimension.SOURCE_NODE_ID.toString()).toString()); + dimensions.put( + FaultDetectionDimension.TARGET_NODE_ID.toString(), + r.get(FaultDetectionDimension.TARGET_NODE_ID.toString()).toString()); + + Double sumLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.SUM)) + .toString()); + Double avgLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.AVG)) + .toString()); + Double minLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.MIN)) + .toString()); + Double maxLatency = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.LAT.toString(), MetricsDB.MAX)) + .toString()); + + Double sumFault = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.FAULT.toString(), + MetricsDB.SUM)) + .toString()); + Double avgFault = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.FAULT.toString(), + MetricsDB.AVG)) + .toString()); + Double minFault = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.FAULT.toString(), + MetricsDB.MIN)) + .toString()); + Double maxFault = + Double.parseDouble( + r.get( + DBUtils.getAggFieldName( + FaultDetectionMetricsSnapshot.Fields.FAULT.toString(), + MetricsDB.MAX)) + .toString()); + if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString()).toString() + .equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_FOLLOWER_CHECK)) { + db.putMetric( + new Metric( + FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(), + sumLatency, + avgLatency, + minLatency, + maxLatency), + dimensions, + 0); + db.putMetric( + new Metric( + FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString(), + sumFault, + avgFault, + minFault, + maxFault), + dimensions, + 0); + } else if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString()).toString() + .equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_LEADER_CHECK)) { + db.putMetric( + new Metric( + FaultDetectionMetric.LEADER_CHECK_LATENCY.toString(), + sumLatency, + avgLatency, + minLatency, + maxLatency), + dimensions, + 0); + db.putMetric( + new Metric( + FaultDetectionMetric.LEADER_CHECK_FAILURE.toString(), + sumFault, + avgFault, + minFault, + maxFault), + dimensions, + 0); + } } + long mFinalT = System.currentTimeMillis(); + PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat( + ReaderMetrics.FAULT_DETECTION_METRICS_EMITTER_EXECUTION_TIME, "", mFinalT - mCurrT); + LOG.debug("Total time taken for writing fault detection metrics to metricsdb: {}", mFinalT - mCurrT); } - long mFinalT = System.currentTimeMillis(); - LOG.debug("Total time taken for writing fault detection metrics to metricsdb: {}", mFinalT - mCurrT); - } public static void emitMasterThrottledTaskMetric( MetricsDB metricsDB, MasterThrottlingMetricsSnapshot masterThrottlingMetricsSnapshot) { diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 11477cc10..ca4cc7bc1 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -135,7 +135,7 @@ public ReaderMetricsProcessor(String rootLocation, boolean processNewFormat, fin shardRqMetricsMap = new TreeMap<>(); httpRqMetricsMap = new TreeMap<>(); masterEventMetricsMap = new TreeMap<>(); - faultDetectionMetricsMap = new TreeMap<>(); + faultDetectionMetricsMap = new TreeMap<>(); shardStateMetricsMap = new TreeMap<>(); gcInfoMap = new TreeMap<>(); masterThrottlingMetricsMap = new TreeMap<>(); @@ -277,7 +277,7 @@ public void trimOldSnapshots() throws Exception { trimMap(shardRqMetricsMap, RQ_SNAPSHOTS); trimMap(httpRqMetricsMap, HTTP_RQ_SNAPSHOTS); trimMap(masterEventMetricsMap, MASTER_EVENT_SNAPSHOTS); - trimMap(faultDetectionMetricsMap, FAULT_DETECTION_SNAPSHOTS); + trimMap(faultDetectionMetricsMap, FAULT_DETECTION_SNAPSHOTS); trimMap(shardStateMetricsMap, SHARD_STATE_SNAPSHOTS); trimMap(gcInfoMap, GC_INFO_SNAPSHOTS); trimMap(masterThrottlingMetricsMap, MASTER_THROTTLING_SNAPSHOTS); @@ -423,7 +423,7 @@ private void emitFaultDetectionMetrics(long prevWindowStartTime, MetricsDB metri if (faultDetectionMetricsMap.containsKey(prevWindowStartTime)) { FaultDetectionMetricsSnapshot prevFaultDetectionSnap = faultDetectionMetricsMap.get(prevWindowStartTime); - MetricsEmitter.emitFaultDetectionMetrics(create, metricsDB, prevFaultDetectionSnap); + MetricsEmitter.emitFaultDetectionMetrics(metricsDB, prevFaultDetectionSnap); } else { LOG.debug( "Fault Detection snapshot for the previous window does not exist. Not emitting metrics."); @@ -637,7 +637,7 @@ is ready so it starts to read that file (go back two windows and eventDispatcher.registerEventProcessor(masterThrottlingEventsProcessor); eventDispatcher.registerEventProcessor(shardStateMetricsProcessor); eventDispatcher.registerEventProcessor(clusterDetailsEventsProcessor); - eventDispatcher.registerEventProcessor(faultDetectionProcessor); + eventDispatcher.registerEventProcessor(faultDetectionProcessor); eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor); eventDispatcher.initializeProcessing( diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java index abb7fd04d..60f7b35a6 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/FaultDetectionMetricsSnapshotTests.java @@ -1,17 +1,31 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader; +import static org.junit.Assert.assertEquals; + import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics; +import java.sql.Connection; +import java.sql.DriverManager; import org.jooq.BatchBindStep; import org.jooq.Record; import org.jooq.Result; import org.junit.Before; import org.junit.Test; -import java.sql.Connection; -import java.sql.DriverManager; - -import static org.junit.Assert.assertEquals; - public class FaultDetectionMetricsSnapshotTests { private static final String DB_URL = "jdbc:sqlite:"; private Connection conn; @@ -47,6 +61,6 @@ public void testPutMetrics() { rt.get(0).get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString())); assertEquals( 0, - Integer.parseInt(rt.get(0).get("sum_" + FaultDetectionMetricsSnapshot.Fields.ERROR.toString()).toString())); + Integer.parseInt(rt.get(0).get("sum_" + FaultDetectionMetricsSnapshot.Fields.FAULT.toString()).toString())); } } diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java index f2181b796..1e21b66d0 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/reader/MetricsEmitterTests.java @@ -396,18 +396,17 @@ public void testFaultDetectionMetricsEmitter() throws Exception { faultDetectionMetricsSnapshot.putStartMetric(10000L, dimensions); faultDetectionMetricsSnapshot.putEndMetric(30000L, 1, dimensions); - DSLContext create = DSL.using(conn, SQLDialect.SQLITE); MetricsDB db = new MetricsDB(1553713438); - MetricsEmitter.emitFaultDetectionMetrics(create, db, faultDetectionMetricsSnapshot); + MetricsEmitter.emitFaultDetectionMetrics(db, faultDetectionMetricsSnapshot); Result res = db.queryMetric( Arrays.asList( - AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(), - AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString()), + AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(), + AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString()), Arrays.asList("avg", "sum"), Arrays.asList(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString())); - Float latency = Float.parseFloat(res.get(0).get(AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString()) + Float latency = Float.parseFloat(res.get(0).get(AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString()) .toString()); db.remove(); assertEquals(20490.0f, latency.floatValue(), 0); diff --git a/src/test/resources/reader/1566413960000 b/src/test/resources/reader/1566413960000 index 58e325d86..75448366c 100644 --- a/src/test/resources/reader/1566413960000 +++ b/src/test/resources/reader/1566413960000 @@ -75,32 +75,28 @@ $ {"MemType":"NonHeap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":260165632,"Heap_Init":2555904,"Heap_Max":-1,"Heap_Used":248759720} {"MemType":"Heap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":17145004032,"Heap_Init":17179869184,"Heap_Max":17145004032,"Heap_Used":5991469464} $ -^fault_detection/follower_check/7627/start +^fault_detection/follower_check/538187/start current_time:1566413936500 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a -StartTime:1566413988986 -$ -^fault_detection/follower_check/7627/finish +StartTime:1566413988986$ +^fault_detection/follower_check/538187/finish current_time:1566413936550 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a FinishTime:1566413989986 -error:0 -$ -^fault_detection/follower_check/7649/start +fault:0$ +^fault_detection/follower_check/727187/start current_time:1566413936507 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a -StartTime:1566413987786 -$ -^fault_detection/follower_check/7649/finish -current_time:1566413936559 +StartTime:1566413987786$ +^fault_detection/follower_check/727187/finish +current_time:1566413936500 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a FinishTime:1566413987986 -error:0 -$ +fault:0$ ^indices/nyc_taxis_2/27 {"current_time":1566413936500} {"Indexing_ThrottleTime":0,"Cache_Query_Hit":8,"Cache_Query_Miss":0,"Cache_Query_Size":483027,"Cache_FieldData_Eviction":0,"Cache_FieldData_Size":0,"Cache_Request_Hit":0,"Cache_Request_Miss":0,"Cache_Request_Eviction":0,"Cache_Request_Size":0,"Refresh_Event":0,"Refresh_Time":0,"Flush_Event":0,"Flush_Time":0,"Merge_Event":0,"Merge_Time":0,"Merge_CurrentEvent":0,"Indexing_Buffer":0,"Segments_Total":21,"Segments_Memory":1508419,"Terms_Memory":1066993,"StoredFields_Memory":116608,"TermVectors_Memory":1066993,"Norms_Memory":0,"Points_Memory":276918,"DocValues_Memory":47900,"IndexWriter_Memory":0,"VersionMap_Memory":0,"Bitset_Memory":0}$ diff --git a/src/test/resources/reader/1566413965000 b/src/test/resources/reader/1566413965000 index eae3b03b7..6b0fc4050 100644 --- a/src/test/resources/reader/1566413965000 +++ b/src/test/resources/reader/1566413965000 @@ -74,32 +74,28 @@ $ {"MemType":"NonHeap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":260427776,"Heap_Init":2555904,"Heap_Max":-1,"Heap_Used":249123672} {"MemType":"Heap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":17145004032,"Heap_Init":17179869184,"Heap_Max":17145004032,"Heap_Used":5973041688} $ -^fault_detection/follower_check/7627/start +^fault_detection/follower_check/538187/start current_time:1566413966497 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a -StartTime:1566413988986 -$ -^fault_detection/follower_check/7627/finish +StartTime:1566413988986$ +^fault_detection/follower_check/538187/finish current_time:1566413966497 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a FinishTime:1566413989986 -error:0 -$ -^fault_detection/follower_check/7649/start +fault:0$ +^fault_detection/follower_check/852187/start current_time:1566413966497 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a -StartTime:1566413987786 -$ -^fault_detection/follower_check/7649/finish +StartTime:1566413987786$ +^fault_detection/follower_check/852187/finish current_time:1566413966497 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a FinishTime:1566413987986 -error:0 -$ +fault:0$ ^indices/nyc_taxis/27 {"current_time":1566413966497} {"Indexing_ThrottleTime":0,"Cache_Query_Hit":0,"Cache_Query_Miss":0,"Cache_Query_Size":0,"Cache_FieldData_Eviction":0,"Cache_FieldData_Size":0,"Cache_Request_Hit":0,"Cache_Request_Miss":0,"Cache_Request_Eviction":0,"Cache_Request_Size":0,"Refresh_Event":2,"Refresh_Time":0,"Flush_Event":0,"Flush_Time":0,"Merge_Event":0,"Merge_Time":0,"Merge_CurrentEvent":0,"Indexing_Buffer":0,"Segments_Total":0,"Segments_Memory":0,"Terms_Memory":0,"StoredFields_Memory":0,"TermVectors_Memory":0,"Norms_Memory":0,"Points_Memory":0,"DocValues_Memory":0,"IndexWriter_Memory":0,"VersionMap_Memory":0,"Bitset_Memory":0}$ diff --git a/src/test/resources/reader/1566413970000 b/src/test/resources/reader/1566413970000 index 9dc96dc61..8bc389e30 100644 --- a/src/test/resources/reader/1566413970000 +++ b/src/test/resources/reader/1566413970000 @@ -75,32 +75,28 @@ $ {"MemType":"NonHeap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":260427776,"Heap_Init":2555904,"Heap_Max":-1,"Heap_Used":249158360} {"MemType":"Heap","GC_Collection_Event":-2,"GC_Collection_Time":-2,"Heap_Committed":17145004032,"Heap_Init":17179869184,"Heap_Max":17145004032,"Heap_Used":8569832928} $ -^fault_detection/follower_check/7627/start +^fault_detection/follower_check/654187/start current_time:1566413996768 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a -StartTime:1566413988986 -$ -^fault_detection/follower_check/7627/finish +StartTime:1566413988986$ +^fault_detection/follower_check/654187/finish current_time:1566413996768 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a FinishTime:1566413989986 -error:0 -$ -^fault_detection/follower_check/7649/start +fault:0$ +^fault_detection/follower_check/852187/start current_time:1566413996768 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a -StartTime:1566413987786 -$ -^fault_detection/follower_check/7649/finish +StartTime:1566413987786$ +^fault_detection/follower_check/852187/finish current_time:1566413996768 SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a FinishTime:1566413987986 -error:0 -$ +fault:0$ ^indices/nyc_taxis_1/27 {"current_time":1566413996768} {"Indexing_ThrottleTime":0,"Cache_Query_Hit":0,"Cache_Query_Miss":0,"Cache_Query_Size":0,"Cache_FieldData_Eviction":0,"Cache_FieldData_Size":0,"Cache_Request_Hit":0,"Cache_Request_Miss":0,"Cache_Request_Eviction":0,"Cache_Request_Size":0,"Refresh_Event":0,"Refresh_Time":0,"Flush_Event":0,"Flush_Time":0,"Merge_Event":0,"Merge_Time":0,"Merge_CurrentEvent":0,"Indexing_Buffer":11114952,"Segments_Total":0,"Segments_Memory":0,"Terms_Memory":0,"StoredFields_Memory":0,"TermVectors_Memory":0,"Norms_Memory":0,"Points_Memory":0,"DocValues_Memory":0,"IndexWriter_Memory":11114952,"VersionMap_Memory":0,"Bitset_Memory":0}$