Skip to content

Commit

Permalink
Thread Metrics RCA. (#180)
Browse files Browse the repository at this point in the history
* Adds Thread Metrics RCA

Signed-off-by: Surya Sashank Nistala <[email protected]>

* Return healthy flow unit from ThreadMetricsRca

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep authored May 27, 2022
1 parent bc570a3 commit bac0bf1
Show file tree
Hide file tree
Showing 10 changed files with 637 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ public enum CommonDimension implements MetricDimension {
SHARD_ROLE(Constants.SHARD_ROLE_VALUE),
SHARD_ID(Constants.SHARDID_VALUE),
EXCEPTION(Constants.EXCEPTION_VALUE),
THREAD_NAME(Constants.THREAD_NAME),
FAILED(Constants.FAILED_VALUE);

private final String value;
Expand All @@ -1205,6 +1206,7 @@ public static class Constants {
public static final String SHARD_ROLE_VALUE = "ShardRole";
public static final String EXCEPTION_VALUE = "Exception";
public static final String FAILED_VALUE = "Failed";
public static final String THREAD_NAME = "ThreadName";
}
}

Expand Down Expand Up @@ -1250,7 +1252,8 @@ public enum AggregatedOSDimension implements MetricDimension {
INDEX_NAME(CommonDimension.INDEX_NAME.toString()),
OPERATION(CommonDimension.OPERATION.toString()),
SHARD_ROLE(CommonDimension.SHARD_ROLE.toString()),
SHARD_ID(CommonDimension.SHARD_ID.toString());
SHARD_ID(CommonDimension.SHARD_ID.toString()),
THREAD_NAME(CommonDimension.THREAD_NAME.toString());

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ public enum ReaderMetrics implements MeasurementSet {
Statistics.COUNT,
Statistics.SUM)),

/** Number of transport threads in BLOCKED state. */
BLOCKED_TRANSPORT_THREAD_COUNT("BlockedTransportThreadCount", "count", Statistics.MAX),

/** Number of transport threads in WAITING or TIMED-WAITING state. */
WAITED_TRANSPORT_THREAD_COUNT("WaitedTransportThreadCount", "count", Statistics.MAX),

/** Max amount of time a transport thread has been BLOCKED in the past 60 seconds. */
MAX_TRANSPORT_THREAD_BLOCKED_TIME("MaxTransportThreadBlockedTime", "seconds", Statistics.MAX),

/**
* Max amount of time a transport thread has been in WAITING or TIMED-WAITING state in the past
* 60 seconds.
*/
MAX_TRANSPORT_THREAD_WAITED_TIME("MaxTransportThreadWaitedTime", "seconds", Statistics.MAX),

/**
* A blanket exception code for {@link
* org.opensearch.performanceanalyzer.reader.ReaderMetricsProcessor} failures.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.IndexWriter_Memory;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_RejectedReqs;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Blocked_Time;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Waited_Time;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.VersionMap_Memory;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
Expand Down Expand Up @@ -76,6 +78,7 @@
import org.opensearch.performanceanalyzer.rca.store.rca.cluster.QueueRejectionClusterRca;
import org.opensearch.performanceanalyzer.rca.store.rca.cluster.ShardRequestCacheClusterRca;
import org.opensearch.performanceanalyzer.rca.store.rca.hot_node.HighCpuRca;
import org.opensearch.performanceanalyzer.rca.store.rca.hot_node.ThreadMetricsRca;
import org.opensearch.performanceanalyzer.rca.store.rca.hotheap.HighHeapUsageOldGenRca;
import org.opensearch.performanceanalyzer.rca.store.rca.hotheap.HighHeapUsageYoungGenRca;
import org.opensearch.performanceanalyzer.rca.store.rca.hotshard.HotShardClusterRca;
Expand All @@ -102,6 +105,9 @@ public class OpenSearchAnalysisGraph extends AnalysisGraph {
public void construct() {
Metric heapUsed = new Heap_Used(EVALUATION_INTERVAL_SECONDS);
Metric gcEvent = new GC_Collection_Event(EVALUATION_INTERVAL_SECONDS);
Thread_Blocked_Time threadBlockedTime =
new Thread_Blocked_Time(EVALUATION_INTERVAL_SECONDS);
Thread_Waited_Time threadWaitedTime = new Thread_Waited_Time(EVALUATION_INTERVAL_SECONDS);
Heap_Max heapMax = new Heap_Max(EVALUATION_INTERVAL_SECONDS);
Metric gc_Collection_Time = new GC_Collection_Time(EVALUATION_INTERVAL_SECONDS);
GC_Type gcType = new GC_Type(EVALUATION_INTERVAL_SECONDS);
Expand Down Expand Up @@ -131,13 +137,21 @@ public void construct() {
cpuUtilizationGroupByOperation.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
threadBlockedTime.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
threadWaitedTime.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);

addLeaf(heapUsed);
addLeaf(gcEvent);
addLeaf(gcType);
addLeaf(heapMax);
addLeaf(gc_Collection_Time);
addLeaf(cpuUtilizationGroupByOperation);
addLeaf(threadBlockedTime);
addLeaf(threadWaitedTime);

// add node stats metrics
List<Metric> nodeStatsMetrics = constructNodeStatsMetrics();
Expand Down Expand Up @@ -168,14 +182,25 @@ public void construct() {
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
highCpuRca.addAllUpstreams(Collections.singletonList(cpuUtilizationGroupByOperation));

Rca<ResourceFlowUnit<HotNodeSummary>> threadMetricsRca =
new ThreadMetricsRca(threadBlockedTime, threadWaitedTime, RCA_PERIOD);
threadMetricsRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
threadMetricsRca.addAllUpstreams(List.of(threadBlockedTime, threadWaitedTime));

Rca<ResourceFlowUnit<HotNodeSummary>> hotJVMNodeRca =
new HotNodeRca(
RCA_PERIOD, highHeapUsageOldGenRca, highHeapUsageYoungGenRca, highCpuRca);
hotJVMNodeRca.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_MASTER_NODE);
hotJVMNodeRca.addAllUpstreams(
Arrays.asList(highHeapUsageOldGenRca, highHeapUsageYoungGenRca, highCpuRca));
Arrays.asList(
highHeapUsageOldGenRca,
highHeapUsageYoungGenRca,
highCpuRca,
threadMetricsRca));

HighHeapUsageClusterRca highHeapUsageClusterRca =
new HighHeapUsageClusterRca(RCA_PERIOD, hotJVMNodeRca);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.rca.store.rca.hot_node;


import java.util.function.Predicate;
import org.opensearch.performanceanalyzer.rca.framework.metrics.ReaderMetrics;

public class ThreadAnalysis {
private final ThreadMetricsSlidingWindow blockedTimeWindow;

private final ThreadMetricsSlidingWindow waitedTimeWindow;
private final Predicate<String> typeFilter;
private final ReaderMetrics blockedThreadCountMetric,
waitedThreadCountMetric,
maxBlockedTimeMetric,
maxWaitedTimeMetric;

public ThreadAnalysis(
Predicate<String> typeFilter,
ReaderMetrics blockedThreadCountMetric,
ReaderMetrics waitedThreadCount,
ReaderMetrics maxBlockedTime,
ReaderMetrics maxWaitedTimeMetric) {
this.typeFilter = typeFilter;
this.blockedThreadCountMetric = blockedThreadCountMetric;
this.waitedThreadCountMetric = waitedThreadCount;
this.maxBlockedTimeMetric = maxBlockedTime;
this.maxWaitedTimeMetric = maxWaitedTimeMetric;
blockedTimeWindow = new ThreadMetricsSlidingWindow();
waitedTimeWindow = new ThreadMetricsSlidingWindow();
}

public ThreadMetricsSlidingWindow getBlockedTimeWindow() {
return blockedTimeWindow;
}

public ThreadMetricsSlidingWindow getWaitedTimeWindow() {
return waitedTimeWindow;
}

public Predicate<String> getTypeFilter() {
return typeFilter;
}

public ReaderMetrics getBlockedThreadCountMetric() {
return blockedThreadCountMetric;
}

public ReaderMetrics getWaitedThreadCountMetric() {
return waitedThreadCountMetric;
}

public ReaderMetrics getMaxBlockedTimeMetric() {
return maxBlockedTimeMetric;
}

public ReaderMetrics getMaxWaitedTimeMetric() {
return maxWaitedTimeMetric;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.rca.store.rca.hot_node;

public class ThreadMetric {
private final String name;
private final double value;
private final long timeStamp;

private final String operation;

public ThreadMetric(String threadName, double val, long timeStamp, String operation) {
this.name = threadName;
this.value = val;
this.timeStamp = timeStamp;
this.operation = operation;
}

public String getOperation() {
return operation;
}

public long getTimeStamp() {
return timeStamp;
}

public String getName() {
return name;
}

public double getValue() {
return value;
}
}
Loading

0 comments on commit bac0bf1

Please sign in to comment.