Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Publish fault detection metrics #470

Merged
merged 5 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,52 @@ public static class Constants {
}
}

public enum FaultDetectionMetric implements MetricValue {
LATENCY_FOLLOWER_CHECK(Constants.LATENCY_FOLLOWER_CHECK),
LATENCY_LEADER_CHECK(Constants.LATENCY_LEADER_CHECK),
FAILURE_FOLLOWER_CHECK(Constants.FAILURE_FOLLOWER_CHECK),
FAILURE_LEADER_CHECK(Constants.FAILURE_LEADER_CHECK);

khushbr marked this conversation as resolved.
Show resolved Hide resolved
private final String value;

FaultDetectionMetric(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}

public static class Constants {
public static final String LATENCY_FOLLOWER_CHECK = "Latency_FollowerCheck";
public static final String LATENCY_LEADER_CHECK = "Latency_LeaderCheck";
public static final String FAILURE_FOLLOWER_CHECK = "Failure_FollowerCheck";
public static final String FAILURE_LEADER_CHECK = "Failure_LeaderCheck";
khushbr marked this conversation as resolved.
Show resolved Hide resolved
}
}

public enum FaultDetectionDimension implements MetricDimension {
SOURCE_NODE_ID(Constants.SOURCE_NODE_ID),
TARGET_NODE_ID(Constants.TARGET_NODE_ID);

private final String value;

FaultDetectionDimension(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}

public static class Constants {
public static final String SOURCE_NODE_ID = "SourceNodeID";
public static final String TARGET_NODE_ID = "TargetNodeID";
}
}

public enum CommonDimension implements MetricDimension {
INDEX_NAME(Constants.INDEX_NAME_VALUE),
OPERATION(Constants.OPERATION_VALUE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class PerformanceAnalyzerMetrics {
public static final String sShardFetchPath = "shardfetch";
public static final String sShardQueryPath = "shardquery";
public static final String sMasterTaskPath = "master_task";
public static final String sFaultDetection = "fault_detection";
public static final String sHttpPath = "http";
public static final String sOSPath = "os_metrics";
public static final String sHeapPath = "heap_metrics";
Expand All @@ -60,6 +61,9 @@ public class PerformanceAnalyzerMetrics {
public static final String MASTER_CURRENT = "current";
public static final String MASTER_META_DATA = "metadata";
public static final String METRIC_CURRENT_TIME = "current_time";
public static final String FAULT_DETECTION_FOLLOWER_CHECK = "follower_check";
public static final String FAULT_DETECTION_LEADER_CHECK = "leader_check";
public static final String ERROR = "error";
khushbr marked this conversation as resolved.
Show resolved Hide resolved
public static final int QUEUE_SIZE = PluginSettings.instance().getWriterQueueSize();

// TODO: Comeup with a more sensible number.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,26 @@ public class MetricsModel {
new MetricAttributes(
MetricUnits.MILLISECOND.toString(), AllMetrics.MasterMetricDimensions.values()));

allMetricsInitializer.put(
AllMetrics.FaultDetectionMetric.LATENCY_FOLLOWER_CHECK.toString(),
new MetricAttributes(
MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values()));

allMetricsInitializer.put(
AllMetrics.FaultDetectionMetric.LATENCY_LEADER_CHECK.toString(),
new MetricAttributes(
MetricUnits.MILLISECOND.toString(), AllMetrics.FaultDetectionDimension.values()));

allMetricsInitializer.put(
AllMetrics.FaultDetectionMetric.FAILURE_FOLLOWER_CHECK.toString(),
new MetricAttributes(
MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values()));

allMetricsInitializer.put(
AllMetrics.FaultDetectionMetric.FAILURE_LEADER_CHECK.toString(),
new MetricAttributes(
MetricUnits.COUNT.toString(), AllMetrics.FaultDetectionDimension.values()));

khushbr marked this conversation as resolved.
Show resolved Hide resolved

ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.reader;
khushbr marked this conversation as resolved.
Show resolved Hide resolved

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event;
import java.io.File;
import java.sql.Connection;
import java.util.Map;
import java.util.NavigableMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.BatchBindStep;

public class FaultDetectionMetricsProcessor implements EventProcessor {
private static final Logger LOG = LogManager.getLogger(FaultDetectionMetricsProcessor.class);
private FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot;
private long startTime;
private long endTime;
private BatchBindStep handle;

public FaultDetectionMetricsProcessor(FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot) {
this.faultDetectionMetricsSnapshot = faultDetectionMetricsSnapshot;
}

static FaultDetectionMetricsProcessor buildFaultDetectionMetricsProcessor(
long currWindowStartTime,
Connection conn,
NavigableMap<Long, FaultDetectionMetricsSnapshot> faultDetectionMetricsMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's move faultDetectionMetricsMap to next line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (faultDetectionMetricsMap.get(currWindowStartTime) == null) {
FaultDetectionMetricsSnapshot faultDetectionMetricsSnapshot =
new FaultDetectionMetricsSnapshot(conn, currWindowStartTime);
Map.Entry<Long, FaultDetectionMetricsSnapshot> entry = faultDetectionMetricsMap.lastEntry();
if (entry != null) {
faultDetectionMetricsSnapshot.rolloverInFlightRequests(entry.getValue());
}
faultDetectionMetricsMap.put(currWindowStartTime, faultDetectionMetricsSnapshot);
return new FaultDetectionMetricsProcessor(faultDetectionMetricsSnapshot);
} else {
return new FaultDetectionMetricsProcessor(faultDetectionMetricsMap.get(currWindowStartTime));
}
}

@Override
public void initializeProcessing(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
this.handle = faultDetectionMetricsSnapshot.startBatchPut();
}

@Override
public void finalizeProcessing() {
if (handle.size() > 0) {
handle.execute();
}
LOG.debug("Final Fault Detection request metrics {}", faultDetectionMetricsSnapshot.fetchAll());
}

@Override
public void processEvent(Event event) {
String[] keyItems = event.key.split(File.separatorChar == '\\' ? "\\\\" : File.separator);
if (keyItems[0].equals(PerformanceAnalyzerMetrics.sFaultDetection)) {
if (keyItems[3].equals(PerformanceAnalyzerMetrics.START_FILE_NAME)) {
emitStartMetric(event, keyItems);
} else if (keyItems[3].equals(PerformanceAnalyzerMetrics.FINISH_FILE_NAME)) {
Comment on lines +83 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the keyItems or event.key will always have the expected values an index 0 and 3 here ?
What happens if keyItems[3] throws an ArrayIndexOutOfBounds Exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't be the case. If this happens somehow, it will have an ArrayIndexOutOfBoundsException. Can add assert here which can check the size of keyItems.

emitFinishMetric(event, keyItems);
}
}
}

@Override
public boolean shouldProcessEvent(Event event) {
return event.key.contains(PerformanceAnalyzerMetrics.sFaultDetection);
}

@Override
public void commitBatchIfRequired() {
if (handle.size() > BATCH_LIMIT) {
handle.execute();
handle = faultDetectionMetricsSnapshot.startBatchPut();
}
}

// A keyItem is of the form : [fault_detection, follower_check, 76532, start]
//
// Example value part of the entry is:
// current_time:1566413979979
// StartTime:1566413987986
// SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a
// TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a
// $
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us either make this a java doc string or movie this multi-line comment to inside the function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it javadoc

private void emitStartMetric(Event entry, String[] keyItems) {
Map<String, String> keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value);

String sourceNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString());
String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString());
String startTimeVal = keyValueMap.get(AllMetrics.CommonMetric.START_TIME.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import AllMetrics.FaultDetectionDimension instead of AllMetrics ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

String fault_detection_type = keyItems[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's movie this line to within the try block, after line 103

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
long st = Long.parseLong(startTimeVal);
// A keyItem is of the form : [fault_detection, follower_check, 76543, start]
String rid = keyItems[2];
handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, st, null, 0);
} catch (NumberFormatException e) {
LOG.error("Unable to parse string. StartTime:{}", startTimeVal);
StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR);
throw e;
}
}

// A keyItem is of the form : [fault_detection, follower_check, 76532, finish]
//
// Example value part of the entry is:
// current_time:1566413979979
// FinishTime:1566413987986
// SourceNodeID:g52i9a93a762cd59dda8d3379b09a752a
// TargetNodeID:b2a5a93a762cd59dda8d3379b09a752a
// Error:0
// $
private void emitFinishMetric(Event entry, String[] keyItems) {
Map<String, String> keyValueMap = ReaderMetricsProcessor.extractEntryData(entry.value);

String sourceNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString());
String targetNodeId = keyValueMap.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString());
String finishTimeVal = keyValueMap.get(AllMetrics.CommonMetric.FINISH_TIME.toString());
String errorString = keyValueMap.get(PerformanceAnalyzerMetrics.ERROR);
String fault_detection_type = keyItems[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar, move this to within try block after line 133

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
long et = Long.parseLong(finishTimeVal);
int error = Integer.parseInt(errorString);
// A keyItem is of the form : [fault_detection, follower_check, 76543, finish]
String rid = keyItems[2];
handle.bind(rid, sourceNodeId, targetNodeId, fault_detection_type, null, et, error);
} catch (NumberFormatException e) {
LOG.error("Unable to parse string. StartTime:{}, Error:{}", finishTimeVal, errorString);
StatsCollector.instance().logException(StatExceptionCode.READER_PARSER_ERROR);
throw e;
}
}
}
Loading