Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Calculate rejection increase and emit the delta increase of rejection…
Browse files Browse the repository at this point in the history
… as metric (#124)
  • Loading branch information
rguo-aws authored Jun 23, 2020
1 parent d4ecfda commit cfe65cd
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.elasticsearch.threadpool.ThreadPoolStats.Stats;

public class ThreadPoolMetricsCollector extends PerformanceAnalyzerMetricsCollector implements MetricsProcessor {
public static final int SAMPLING_TIME_INTERVAL = MetricsConfiguration.CONFIG_MAP.get(ThreadPoolMetricsCollector.class).samplingInterval;
private static final int KEYS_PATH_LENGTH = 0;
private StringBuilder value;
private final Map<String, ThreadPoolStatsRecord> statsRecordMap;

public ThreadPoolMetricsCollector() {
super(SAMPLING_TIME_INTERVAL, "ThreadPoolMetrics");
value = new StringBuilder();
statsRecordMap = new HashMap<>();
}

@Override
Expand All @@ -52,6 +57,24 @@ public void collectMetrics(long startTime) {

while (statsIterator.hasNext()) {
Stats stats = statsIterator.next();
long rejectionDelta = 0;
String threadPoolName = stats.getName();
if (statsRecordMap.containsKey(threadPoolName)) {
ThreadPoolStatsRecord lastRecord = statsRecordMap.get(threadPoolName);
// if the timestamp in previous record is greater than 15s (3 * intervals),
// then the scheduler might hang or freeze due to long GC etc. We simply drop
// previous record here and set rejectionDelta to 0.
if (startTime - lastRecord.getTimestamp() <= SAMPLING_TIME_INTERVAL * 3) {
rejectionDelta = stats.getRejected() - lastRecord.getRejected();
// we might not run into this as rejection is a LongAdder which never decrement its count.
// regardless, let's set it to 0 to be safe.
if (rejectionDelta < 0) {
rejectionDelta = 0;
}
}
}
statsRecordMap.put(threadPoolName, new ThreadPoolStatsRecord(startTime, stats.getRejected()));
final long finalRejectionDelta = rejectionDelta;
ThreadPoolStatus threadPoolStatus = AccessController.doPrivileged((PrivilegedAction<ThreadPoolStatus>) () -> {
try {
//This is for backward compatibility. core ES may or may not emit latency metric
Expand All @@ -64,13 +87,13 @@ public void collectMetrics(long startTime) {
Method getCapacityMethod = Stats.class.getMethod("getCapacity");
int capacity = (Integer) getCapacityMethod.invoke(stats);
return new ThreadPoolStatus(stats.getName(),
stats.getQueue(), stats.getRejected(),
stats.getQueue(), finalRejectionDelta,
stats.getThreads(), stats.getActive(),
latency, capacity);
} catch (Exception e) {
//core ES does not have the latency patch. send the threadpool metrics without adding latency.
return new ThreadPoolStatus(stats.getName(),
stats.getQueue(), stats.getRejected(),
stats.getQueue(), finalRejectionDelta,
stats.getThreads(), stats.getActive());
}
});
Expand All @@ -90,6 +113,24 @@ public String getMetricsPath(long startTime, String... keysPath) {
return PerformanceAnalyzerMetrics.generatePath(startTime, PerformanceAnalyzerMetrics.sThreadPoolPath);
}

private static class ThreadPoolStatsRecord {
private final long timestamp;
private final long rejected;

ThreadPoolStatsRecord(long timestamp, long rejected) {
this.timestamp = timestamp;
this.rejected = rejected;
}

public long getTimestamp() {
return timestamp;
}

public long getRejected() {
return rejected;
}
}

public static class ThreadPoolStatus extends MetricStatus {
public final String type;
public final int queueSize;
Expand Down Expand Up @@ -131,6 +172,19 @@ public ThreadPoolStatus(String type,
this.queueCapacity = queueCapacity;
}

// default constructor for jackson to de-serialize this class
// from json string in unit test
@VisibleForTesting
public ThreadPoolStatus() {
this.type = "testing";
this.queueSize = -1;
this.rejected = -1;
this.threadsCount = -1;
this.threadsActive = -1;
this.queueLatency = null;
this.queueCapacity = null;
}

@JsonProperty(ThreadPoolDimension.Constants.TYPE_VALUE)
public String getType() {
return type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,48 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.CustomMetricsLocationTestBase;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.config.PluginSettings;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.ESResources;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector.ThreadPoolStatus;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.MetricsConfiguration;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.PerformanceAnalyzerMetrics;
import static org.junit.Assert.assertEquals;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader_writer_shared.Event;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

@Ignore
public class ThreadPoolMetricsCollectorTests extends CustomMetricsLocationTestBase {

private ThreadPoolMetricsCollector threadPoolMetricsCollector;

@Mock
private ThreadPool mockThreadPool;

@Before
public void init() {
mockThreadPool = Mockito.mock(ThreadPool.class);
ESResources.INSTANCE.setThreadPool(mockThreadPool);
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
MetricsConfiguration.CONFIG_MAP.put(ThreadPoolMetricsCollector.class, MetricsConfiguration.cdefault);
threadPoolMetricsCollector = new ThreadPoolMetricsCollector();
}

@Test
public void testThreadPoolMetrics() {
System.setProperty("performanceanalyzer.metrics.log.enabled", "False");
long startTimeInMills = 1453724339;

MetricsConfiguration.CONFIG_MAP.put(ThreadPoolMetricsCollector.class, MetricsConfiguration.cdefault);

ThreadPoolMetricsCollector threadPoolMetricsCollector = new ThreadPoolMetricsCollector();
threadPoolMetricsCollector.saveMetricValues("12321.5464", startTimeInMills);


String fetchedValue = PerformanceAnalyzerMetrics.getMetric(
PluginSettings.instance().getMetricsLocation()
+ PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills)+"/thread_pool/");
PerformanceAnalyzerMetrics.removeMetrics(PluginSettings.instance().getMetricsLocation()
+ PerformanceAnalyzerMetrics.getTimeInterval(startTimeInMills));
assertEquals("12321.5464", fetchedValue);
List<Event> metrics = readEvents();
assertEquals(1, metrics.size());
assertEquals("12321.5464", metrics.get(0).value);

try {
threadPoolMetricsCollector.saveMetricValues("12321.5464", startTimeInMills, "123");
Expand All @@ -59,4 +72,64 @@ public void testThreadPoolMetrics() {
//- expecting exception...2 values passed; 0 expected
}
}

@Test
public void testCollectMetrics() throws IOException {
long startTimeInMills = 1453724339;
Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(2));
threadPoolMetricsCollector.collectMetrics(startTimeInMills);
ThreadPoolStatus threadPoolStatus = readMetrics();
assertEquals(0, threadPoolStatus.getRejected());

startTimeInMills += 5000;
Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(4));
threadPoolMetricsCollector.collectMetrics(startTimeInMills);
threadPoolStatus = readMetrics();
assertEquals(2, threadPoolStatus.getRejected());

startTimeInMills += 12000;
Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(9));
threadPoolMetricsCollector.collectMetrics(startTimeInMills);
threadPoolStatus = readMetrics();
assertEquals(5, threadPoolStatus.getRejected());

startTimeInMills += 16000;
Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(20));
threadPoolMetricsCollector.collectMetrics(startTimeInMills);
threadPoolStatus = readMetrics();
assertEquals(0, threadPoolStatus.getRejected());

startTimeInMills += 3000;
Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(21));
threadPoolMetricsCollector.collectMetrics(startTimeInMills);
threadPoolStatus = readMetrics();
assertEquals(1, threadPoolStatus.getRejected());

startTimeInMills += 3000;
Mockito.when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat(19));
threadPoolMetricsCollector.collectMetrics(startTimeInMills);
threadPoolStatus = readMetrics();
assertEquals(0, threadPoolStatus.getRejected());
}

private ThreadPoolStats generateThreadPoolStat(long rejected) {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
stats.add(new ThreadPoolStats.Stats("write", 0, 0, 0, rejected, 0, 0));
return new ThreadPoolStats(stats);
}

private List<Event> readEvents() {
List<Event> metrics = new ArrayList<>();
PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics);
return metrics;
}

private ThreadPoolStatus readMetrics() throws IOException {
List<Event> metrics = readEvents();
assert metrics.size() == 1;
ObjectMapper objectMapper = new ObjectMapper();
String[] jsonStrs = metrics.get(0).value.split("\n");
assert jsonStrs.length == 2;
return objectMapper.readValue(jsonStrs[1], ThreadPoolStatus.class);
}
}

0 comments on commit cfe65cd

Please sign in to comment.