From 393f4791e4bdbc1560dac75373bcc1b85493b164 Mon Sep 17 00:00:00 2001 From: Ruizhen Guo <55893852+rguo-aws@users.noreply.github.com> Date: Wed, 3 Jun 2020 18:54:24 -0700 Subject: [PATCH] collect queue latency metric in PerformanceAnalyzer (#111) Authored-By: rguo-aws --- licenses/performanceanalyzer-rca-1.3.jar.sha1 | 2 +- .../ThreadPoolMetricsCollector.java | 63 +++++++++++++++---- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/licenses/performanceanalyzer-rca-1.3.jar.sha1 b/licenses/performanceanalyzer-rca-1.3.jar.sha1 index 7e552d92..c2d0c6ea 100644 --- a/licenses/performanceanalyzer-rca-1.3.jar.sha1 +++ b/licenses/performanceanalyzer-rca-1.3.jar.sha1 @@ -1 +1 @@ -67a91489b55ec6cd563c823e6790a8909bd334c3 \ No newline at end of file +7ab11a3251d4bd2b09ddca5a4f01e4ef3dbdc9ed \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java index 02a37917..de807433 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java @@ -15,17 +15,20 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; -import java.util.Iterator; - -import org.elasticsearch.threadpool.ThreadPoolStats.Stats; - import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolValue; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsProcessor; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; +import java.lang.reflect.Method; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Iterator; +import org.elasticsearch.threadpool.ThreadPoolStats.Stats; public class ThreadPoolMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor { public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(ThreadPoolMetricsCollector.class).samplingInterval; @@ -49,12 +52,26 @@ public void collectMetrics(long startTime) { while (statsIterator.hasNext()) { Stats stats = statsIterator.next(); + ThreadPoolStatus threadPoolStatus = AccessController.doPrivileged((PrivilegedAction) () -> { + try { + //This is for backward compatibility. core ES may or may not emit latency metric + // (depending on whether the patch has been applied or not) + // so we need to use reflection to check whether getLatency() method exist in ThreadPoolStats.java. + Method getLantencyMethod = Stats.class.getMethod("getLatency"); + double latency = (Double) getLantencyMethod.invoke(stats); + return new ThreadPoolStatus(stats.getName(), + stats.getQueue(), stats.getRejected(), + stats.getThreads(), stats.getActive(), latency); + } catch (Exception e) { + //core ES does not have the latency patch. send the threadpool metrics without adding latency. + return new ThreadPoolStatus(stats.getName(), + stats.getQueue(), stats.getRejected(), + stats.getThreads(), stats.getActive()); + } + }); value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(new ThreadPoolStatus(stats.getName(), - stats.getQueue(), stats.getRejected(), - stats.getThreads(), stats.getActive()).serialize()); + .append(threadPoolStatus.serialize()); } - saveMetricValues(value.toString(), startTime); } @@ -74,17 +91,34 @@ public static class ThreadPoolStatus extends MetricStatus { public final long rejected; public final int threadsCount; public final int threadsActive; + @JsonInclude(Include.NON_NULL) + public final Double queueLatency; + + public ThreadPoolStatus(String type, + int queueSize, + long rejected, + int threadsCount, + int threadsActive) { + this.type = type; + this.queueSize = queueSize; + this.rejected = rejected; + this.threadsCount = threadsCount; + this.threadsActive = threadsActive; + this.queueLatency = null; + } public ThreadPoolStatus(String type, - int queueSize, - long rejected, - int threadsCount, - int threadsActive) { + int queueSize, + long rejected, + int threadsCount, + int threadsActive, + double queueLatency) { this.type = type; this.queueSize = queueSize; this.rejected = rejected; this.threadsCount = threadsCount; this.threadsActive = threadsActive; + this.queueLatency = queueLatency; } @JsonProperty(ThreadPoolDimension.Constants.TYPE_VALUE) @@ -111,5 +145,10 @@ public int getThreadsCount() { public int getThreadsActive() { return threadsActive; } + + @JsonProperty(ThreadPoolValue.Constants.QUEUE_LATENCY_VALUE) + public Double getQueueLatency() { + return queueLatency; + } } }