Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
collect queue latency metric in PerformanceAnalyzer (#111)
Browse files Browse the repository at this point in the history
Authored-By: rguo-aws
  • Loading branch information
rguo-aws authored Jun 4, 2020
1 parent e4f209e commit 393f479
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 13 deletions.
2 changes: 1 addition & 1 deletion licenses/performanceanalyzer-rca-1.3.jar.sha1
Original file line number Diff line number Diff line change
@@ -1 +1 @@
67a91489b55ec6cd563c823e6790a8909bd334c3
7ab11a3251d4bd2b09ddca5a4f01e4ef3dbdc9ed
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import java.util.Iterator;

import org.elasticsearch.threadpool.ThreadPoolStats.Stats;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolValue;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Iterator;
import org.elasticsearch.threadpool.ThreadPoolStats.Stats;

public class ThreadPoolMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(ThreadPoolMetricsCollector.class).samplingInterval;
Expand All @@ -49,12 +52,26 @@ public void collectMetrics(long startTime) {

while (statsIterator.hasNext()) {
Stats stats = statsIterator.next();
ThreadPoolStatus threadPoolStatus = AccessController.doPrivileged((PrivilegedAction<ThreadPoolStatus>) () -> {
try {
//This is for backward compatibility. core ES may or may not emit latency metric
// (depending on whether the patch has been applied or not)
// so we need to use reflection to check whether getLatency() method exist in ThreadPoolStats.java.
Method getLantencyMethod = Stats.class.getMethod("getLatency");
double latency = (Double) getLantencyMethod.invoke(stats);
return new ThreadPoolStatus(stats.getName(),
stats.getQueue(), stats.getRejected(),
stats.getThreads(), stats.getActive(), latency);
} catch (Exception e) {
//core ES does not have the latency patch. send the threadpool metrics without adding latency.
return new ThreadPoolStatus(stats.getName(),
stats.getQueue(), stats.getRejected(),
stats.getThreads(), stats.getActive());
}
});
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(new ThreadPoolStatus(stats.getName(),
stats.getQueue(), stats.getRejected(),
stats.getThreads(), stats.getActive()).serialize());
.append(threadPoolStatus.serialize());
}

saveMetricValues(value.toString(), startTime);
}

Expand All @@ -74,17 +91,34 @@ public static class ThreadPoolStatus extends MetricStatus {
public final long rejected;
public final int threadsCount;
public final int threadsActive;
@JsonInclude(Include.NON_NULL)
public final Double queueLatency;

public ThreadPoolStatus(String type,
int queueSize,
long rejected,
int threadsCount,
int threadsActive) {
this.type = type;
this.queueSize = queueSize;
this.rejected = rejected;
this.threadsCount = threadsCount;
this.threadsActive = threadsActive;
this.queueLatency = null;
}

public ThreadPoolStatus(String type,
int queueSize,
long rejected,
int threadsCount,
int threadsActive) {
int queueSize,
long rejected,
int threadsCount,
int threadsActive,
double queueLatency) {
this.type = type;
this.queueSize = queueSize;
this.rejected = rejected;
this.threadsCount = threadsCount;
this.threadsActive = threadsActive;
this.queueLatency = queueLatency;
}

@JsonProperty(ThreadPoolDimension.Constants.TYPE_VALUE)
Expand All @@ -111,5 +145,10 @@ public int getThreadsCount() {
public int getThreadsActive() {
return threadsActive;
}

@JsonProperty(ThreadPoolValue.Constants.QUEUE_LATENCY_VALUE)
public Double getQueueLatency() {
return queueLatency;
}
}
}

0 comments on commit 393f479

Please sign in to comment.