diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java index 25397ba2..86609452 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java @@ -24,20 +24,25 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import java.lang.reflect.Method; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import org.elasticsearch.threadpool.ThreadPoolStats.Stats; public class ThreadPoolMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor { public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(ThreadPoolMetricsCollector.class).samplingInterval; private static final int KEYS_PATH_LENGTH = 0; private StringBuilder value; + private final Map statsRecordMap; public ThreadPoolMetricsCollector() { super(SAMPLING_TIME_INTERVAL, "ThreadPoolMetrics"); value = new StringBuilder(); + statsRecordMap = new HashMap<>(); } @Override @@ -52,6 +57,24 @@ public void collectMetrics(long startTime) { while (statsIterator.hasNext()) { Stats stats = statsIterator.next(); + long rejectionDelta = 0; + String threadPoolName = stats.getName(); + if (statsRecordMap.containsKey(threadPoolName)) { + ThreadPoolStatsRecord lastRecord = statsRecordMap.get(threadPoolName); + // if the timestamp in previous record is greater than 15s (3 * intervals), + // then the scheduler might hang or freeze due to long GC etc. We simply drop + // previous record here and set rejectionDelta to 0. + if (startTime - lastRecord.getTimestamp() <= SAMPLING_TIME_INTERVAL * 3) { + rejectionDelta = stats.getRejected() - lastRecord.getRejected(); + // we might not run into this as rejection is a LongAdder which never decrement its count. + // regardless, let's set it to 0 to be safe. + if (rejectionDelta < 0) { + rejectionDelta = 0; + } + } + } + statsRecordMap.put(threadPoolName, new ThreadPoolStatsRecord(startTime, stats.getRejected())); + final long finalRejectionDelta = rejectionDelta; ThreadPoolStatus threadPoolStatus = AccessController.doPrivileged((PrivilegedAction) () -> { try { //This is for backward compatibility. core ES may or may not emit latency metric @@ -64,13 +87,13 @@ public void collectMetrics(long startTime) { Method getCapacityMethod = Stats.class.getMethod("getCapacity"); int capacity = (Integer) getCapacityMethod.invoke(stats); return new ThreadPoolStatus(stats.getName(), - stats.getQueue(), stats.getRejected(), + stats.getQueue(), finalRejectionDelta, stats.getThreads(), stats.getActive(), latency, capacity); } catch (Exception e) { //core ES does not have the latency patch. send the threadpool metrics without adding latency. return new ThreadPoolStatus(stats.getName(), - stats.getQueue(), stats.getRejected(), + stats.getQueue(), finalRejectionDelta, stats.getThreads(), stats.getActive()); } }); @@ -90,6 +113,24 @@ public String getMetricsPath(long startTime, String... keysPath) { return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sThreadPoolPath); } + private static class ThreadPoolStatsRecord { + private final long timestamp; + private final long rejected; + + ThreadPoolStatsRecord(long timestamp, long rejected) { + this.timestamp = timestamp; + this.rejected = rejected; + } + + public long getTimestamp() { + return timestamp; + } + + public long getRejected() { + return rejected; + } + } + public static class ThreadPoolStatus extends MetricStatus { public final String type; public final int queueSize; @@ -131,6 +172,19 @@ public ThreadPoolStatus(String type, this.queueCapacity = queueCapacity; } + // default constructor for jackson to de-serialize this class + // from json string in unit test + @VisibleForTesting + public ThreadPoolStatus() { + this.type = "testing"; + this.queueSize = -1; + this.rejected = -1; + this.threadsCount = -1; + this.threadsActive = -1; + this.queueLatency = null; + this.queueCapacity = null; + } + @JsonProperty(ThreadPoolDimension.Constants.TYPE_VALUE) public String getType() { return type; diff --git a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollectorTests.java b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollectorTests.java index 3dc0cb3c..ef589ffb 100644 --- a/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollectorTests.java +++ b/src/test/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/collectors/ThreadPoolMetricsCollectorTests.java @@ -15,35 +15,48 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors; -import org.junit.Ignore; -import org.junit.Test; +import static org.junit.Assert.assertEquals; import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase; -import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector.ThreadPoolStatus; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration; import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics; -import static org.junit.Assert.assertEquals; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; -@Ignore public class ThreadPoolMetricsCollectorTests extends CustomMetricsLocationTestBase { + private ThreadPoolMetricsCollector threadPoolMetricsCollector; + + @Mock + private ThreadPool mockThreadPool; + + @Before + public void init() { + mockThreadPool = Mockito.mock(ThreadPool.class); + ESResources.INSTANCE.setThreadPool(mockThreadPool); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + MetricsConfiguration.CONFIG_MAP.put(ThreadPoolMetricsCollector.class, MetricsConfiguration.cdefault); + threadPoolMetricsCollector = new ThreadPoolMetricsCollector(); + } + @Test public void testThreadPoolMetrics() { - System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); long startTimeInMills = 1453724339; - - MetricsConfiguration.CONFIG_MAP.put(ThreadPoolMetricsCollector.class, MetricsConfiguration.cdefault); - - ThreadPoolMetricsCollector threadPoolMetricsCollector = new ThreadPoolMetricsCollector(); threadPoolMetricsCollector.saveMetricValues("12321.5464", startTimeInMills); - - - String fetchedValue = PerformanceAnalyzerMetrics.getMetric( - PluginSettings.instance().getMetricsLocation() - + PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)+"/thread_pool/"); - PerformanceAnalyzerMetrics.removeMetrics(PluginSettings.instance().getMetricsLocation() - + PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)); - assertEquals("12321.5464", fetchedValue); + List metrics = readEvents(); + assertEquals(1, metrics.size()); + assertEquals("12321.5464", metrics.get(0).value); try { threadPoolMetricsCollector.saveMetricValues("12321.5464", startTimeInMills, "123"); @@ -59,4 +72,64 @@ public void testThreadPoolMetrics() { //- expecting exception...2 values passed; 0 expected } } + + @Test + public void testCollectMetrics() throws IOException { + long startTimeInMills = 1453724339; + Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(2)); + threadPoolMetricsCollector.collectMetrics(startTimeInMills); + ThreadPoolStatus threadPoolStatus = readMetrics(); + assertEquals(0, threadPoolStatus.getRejected()); + + startTimeInMills += 5000; + Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(4)); + threadPoolMetricsCollector.collectMetrics(startTimeInMills); + threadPoolStatus = readMetrics(); + assertEquals(2, threadPoolStatus.getRejected()); + + startTimeInMills += 12000; + Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(9)); + threadPoolMetricsCollector.collectMetrics(startTimeInMills); + threadPoolStatus = readMetrics(); + assertEquals(5, threadPoolStatus.getRejected()); + + startTimeInMills += 16000; + Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(20)); + threadPoolMetricsCollector.collectMetrics(startTimeInMills); + threadPoolStatus = readMetrics(); + assertEquals(0, threadPoolStatus.getRejected()); + + startTimeInMills += 3000; + Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(21)); + threadPoolMetricsCollector.collectMetrics(startTimeInMills); + threadPoolStatus = readMetrics(); + assertEquals(1, threadPoolStatus.getRejected()); + + startTimeInMills += 3000; + Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(19)); + threadPoolMetricsCollector.collectMetrics(startTimeInMills); + threadPoolStatus = readMetrics(); + assertEquals(0, threadPoolStatus.getRejected()); + } + + private ThreadPoolStats generateThreadPoolStat(long rejected) { + List stats = new ArrayList<>(); + stats.add(new ThreadPoolStats.Stats("write", 0, 0, 0, rejected, 0, 0)); + return new ThreadPoolStats(stats); + } + + private List readEvents() { + List metrics = new ArrayList<>(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + return metrics; + } + + private ThreadPoolStatus readMetrics() throws IOException { + List metrics = readEvents(); + assert metrics.size() == 1; + ObjectMapper objectMapper = new ObjectMapper(); + String[] jsonStrs = metrics.get(0).value.split("\n"); + assert jsonStrs.length == 2; + return objectMapper.readValue(jsonStrs[1], ThreadPoolStatus.class); + } }