From b6439c5d97052655dd75d7f1de1dc344ba38ecaa Mon Sep 17 00:00:00 2001 From: Nikola Grcevski <6207777+grcevski@users.noreply.github.com> Date: Tue, 19 Oct 2021 13:47:17 -0400 Subject: [PATCH] Add other time accounting in HotThreads (#79392) Currently, the HotThreads CPU report contains only the CPU cycles that a thread executes, excluding time spent in I/O or the JVM runtime (e.g. GC). This PR introduces the 'other' time accounting to augment the CPU time for better understanding of user visible time impact. --- .../hotthreads/NodesHotThreadsRequest.java | 17 + .../NodesHotThreadsRequestBuilder.java | 5 + .../TransportNodesHotThreadsAction.java | 1 + .../elasticsearch/bootstrap/Bootstrap.java | 2 + .../elasticsearch/monitor/jvm/HotThreads.java | 290 +++++++++---- .../cluster/RestNodesHotThreadsAction.java | 8 +- .../elasticsearch/bootstrap/security.policy | 2 - .../NodesHotThreadsRequestTests.java | 2 +- .../monitor/jvm/HotThreadsTests.java | 384 ++++++++++++------ 9 files changed, 488 insertions(+), 223 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java index 2be13918c797e..0e013dbe93d14 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsRequest.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.cluster.node.hotthreads; +import org.elasticsearch.Version; import org.elasticsearch.action.support.nodes.BaseNodesRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; @@ -22,6 +23,7 @@ public class NodesHotThreadsRequest extends BaseNodesRequest knownIdleStackFrames = Arrays.asList( @@ -57,7 +58,7 @@ public class HotThreads { // NOTE: these are JVM dependent and JVM version dependent private static final List knownJDKInternalThreads = Arrays.asList( - "Signal Dispatcher", "Finalizer", "Reference Handler", "Notification Thread", "Common-Cleaner", "process reaper" + "Signal Dispatcher", "Finalizer", "Reference Handler", "Notification Thread", "Common-Cleaner", "process reaper", "DestroyJavaVM" ); public enum ReportType { @@ -77,6 +78,8 @@ public String getTypeValue() { return type; } + // Custom enum from string because of legacy support. The standard Enum.valueOf is static + // and cannot be overriden to show a nice error message in case the enum value isn't found public static ReportType of(String type) { for (var report : values()) { if (report.type.equals(type)) { @@ -87,6 +90,33 @@ public static ReportType of(String type) { } } + public enum SortOrder { + + TOTAL("total"), + CPU("cpu"); + + private final String order; + + SortOrder(String order) { + this.order = order; + } + + public String getOrderValue() { + return order; + } + + // Custom enum from string because of legacy support. The standard Enum.valueOf is static + // and cannot be overriden to show a nice error message in case the enum value isn't found + public static SortOrder of(String order) { + for (var sortOrder : values()) { + if (sortOrder.order.equals(order)) { + return sortOrder; + } + } + throw new IllegalArgumentException("sort order not supported [" + order + "]"); + } + } + public HotThreads interval(TimeValue interval) { this.interval = interval; return this; @@ -117,9 +147,21 @@ public HotThreads type(ReportType type) { return this; } + public HotThreads sortOrder(SortOrder order) { + this.sortOrder = order; + return this; + } + public String detect() throws Exception { synchronized (mutex) { - return innerDetect(ManagementFactory.getThreadMXBean(), SunThreadInfo.INSTANCE, Thread.currentThread().getId()); + return innerDetect( + ManagementFactory.getThreadMXBean(), + SunThreadInfo.INSTANCE, + Thread.currentThread().getId(), + (interval) -> { + Thread.sleep(interval); + return null; + }); } } @@ -161,7 +203,7 @@ Map getAllValidThreadInfos(ThreadMXBean threadBean, continue; } long allocatedBytes = type == ReportType.MEM ? sunThreadInfo.getThreadAllocatedBytes(threadIds[i]) : 0; - result.put(threadIds[i], new ThreadTimeAccumulator(threadInfos[i], cpuTime, allocatedBytes)); + result.put(threadIds[i], new ThreadTimeAccumulator(threadInfos[i], interval, cpuTime, allocatedBytes)); } return result; @@ -180,16 +222,22 @@ ThreadInfo[][] captureThreadStacks(ThreadMXBean threadBean, long[] threadIds) th return result; } - private void setThreadWaitBlockTimeMonitoringEnabled(ThreadMXBean threadBean, boolean enabled) { + private boolean isThreadWaitBlockTimeMonitoringEnabled(ThreadMXBean threadBean) { if (threadBean.isThreadContentionMonitoringSupported()) { - AccessController.doPrivileged((PrivilegedAction) () -> { - threadBean.setThreadContentionMonitoringEnabled(enabled); - return null; - }); + return threadBean.isThreadContentionMonitoringEnabled(); } + return false; } - String innerDetect(ThreadMXBean threadBean, SunThreadInfo sunThreadInfo, long currentThreadId) throws Exception { + private double getTimeSharePercentage(long time) { + return (((double) time) / interval.nanos()) * 100; + } + + String innerDetect( + ThreadMXBean threadBean, + SunThreadInfo sunThreadInfo, + long currentThreadId, + SleepFunction threadSleep) throws Exception { if (threadBean.isThreadCpuTimeSupported() == false) { throw new ElasticsearchException("thread CPU time is not supported on this JDK"); } @@ -198,6 +246,12 @@ String innerDetect(ThreadMXBean threadBean, SunThreadInfo sunThreadInfo, long cu throw new ElasticsearchException("thread allocated memory is not supported on this JDK"); } + // Enabling thread contention monitoring is required for capturing JVM thread wait/blocked times. If we weren't + // able to enable this functionality during bootstrap, we should not produce HotThreads reports. + if (isThreadWaitBlockTimeMonitoringEnabled(threadBean) == false) { + throw new ElasticsearchException("thread wait/blocked time accounting is not supported on this JDK"); + } + StringBuilder sb = new StringBuilder() .append("Hot threads at ") .append(DATE_TIME_FORMATTER.format(LocalDateTime.now(Clock.systemUTC()))) @@ -209,99 +263,115 @@ String innerDetect(ThreadMXBean threadBean, SunThreadInfo sunThreadInfo, long cu .append(ignoreIdleThreads) .append(":\n"); - // Enabling thread contention monitoring is required for capturing JVM thread wait/blocked times - setThreadWaitBlockTimeMonitoringEnabled(threadBean, true); + // Capture before and after thread state with timings + Map previousThreadInfos = getAllValidThreadInfos(threadBean, sunThreadInfo, currentThreadId); + threadSleep.apply(interval.millis()); + Map latestThreadInfos = getAllValidThreadInfos(threadBean, sunThreadInfo, currentThreadId); - try { - // Capture before and after thread state with timings - Map previousThreadInfos = getAllValidThreadInfos(threadBean, sunThreadInfo, currentThreadId); - Thread.sleep(interval.millis()); - Map latestThreadInfos = getAllValidThreadInfos(threadBean, sunThreadInfo, currentThreadId); + latestThreadInfos.forEach((threadId, accumulator) -> accumulator.subtractPrevious(previousThreadInfos.get(threadId))); - latestThreadInfos.forEach((threadId, accumulator) -> accumulator.subtractPrevious(previousThreadInfos.get(threadId))); + // Sort by delta CPU time on thread. + List topThreads = new ArrayList<>(latestThreadInfos.values()); - // Sort by delta CPU time on thread. - List topThreads = new ArrayList<>(latestThreadInfos.values()); - final ToLongFunction getter = ThreadTimeAccumulator.valueGetterForReportType(type); + // Special comparator for CPU mode with TOTAL sort type only. Otherwise, we simply use the value. + if (type == ReportType.CPU && sortOrder == SortOrder.TOTAL) { + CollectionUtil.introSort(topThreads, + Comparator.comparingLong(ThreadTimeAccumulator::getRunnableTime) + .thenComparingLong(ThreadTimeAccumulator::getCpuTime).reversed()); + } else { + CollectionUtil.introSort(topThreads, + Comparator.comparingLong(ThreadTimeAccumulator.valueGetterForReportType(type)).reversed()); + } - CollectionUtil.introSort(topThreads, Comparator.comparingLong(getter).reversed()); - topThreads = topThreads.subList(0, Math.min(busiestThreads, topThreads.size())); - long[] topThreadIds = topThreads.stream().mapToLong(t -> t.threadId).toArray(); + topThreads = topThreads.subList(0, Math.min(busiestThreads, topThreads.size())); + long[] topThreadIds = topThreads.stream().mapToLong(t -> t.threadId).toArray(); - // analyse N stack traces for the top threads - ThreadInfo[][] allInfos = captureThreadStacks(threadBean, topThreadIds); + // analyse N stack traces for the top threads + ThreadInfo[][] allInfos = captureThreadStacks(threadBean, topThreadIds); - for (int t = 0; t < topThreads.size(); t++) { - long timeOrBytes = getter.applyAsLong(topThreads.get(t)); - String threadName = null; - for (ThreadInfo[] info : allInfos) { - if (info != null && info[t] != null) { - if (ignoreIdleThreads && isIdleThread(info[t])) { - info[t] = null; - continue; - } - threadName = info[t].getThreadName(); - break; + for (int t = 0; t < topThreads.size(); t++) { + String threadName = null; + for (ThreadInfo[] info : allInfos) { + if (info != null && info[t] != null) { + if (ignoreIdleThreads && isIdleThread(info[t])) { + info[t] = null; + continue; } + threadName = info[t].getThreadName(); + break; } - if (threadName == null) { - continue; // thread is not alive yet or died before the first snapshot - ignore it! - } + } + if (threadName == null) { + continue; // thread is not alive yet or died before the first snapshot - ignore it! + } + + ThreadTimeAccumulator topThread = topThreads.get(t); - if (type == ReportType.MEM) { + switch (type) { + case MEM: sb.append(String.format(Locale.ROOT, "%n%s memory allocated by thread '%s'%n", - new ByteSizeValue(timeOrBytes), threadName)); - } else { - double percent = (((double) timeOrBytes) / interval.nanos()) * 100; + new ByteSizeValue(topThread.getAllocatedBytes()), threadName)); + break; + case CPU: + double percentCpu = getTimeSharePercentage(topThread.getCpuTime()); + double percentOther = getTimeSharePercentage(topThread.getOtherTime()); + sb.append(String.format(Locale.ROOT, + "%n%4.1f%% [cpu=%1.1f%%, other=%1.1f%%] (%s out of %s) %s usage by thread '%s'%n", + percentOther + percentCpu, percentCpu, percentOther, + TimeValue.timeValueNanos(topThread.getCpuTime() + topThread.getOtherTime()), + interval, type.getTypeValue(), threadName)); + break; + default: + long time = ThreadTimeAccumulator.valueGetterForReportType(type).applyAsLong(topThread); + double percent = getTimeSharePercentage(time); sb.append(String.format(Locale.ROOT, "%n%4.1f%% (%s out of %s) %s usage by thread '%s'%n", - percent, TimeValue.timeValueNanos(timeOrBytes), interval, type.getTypeValue(), threadName)); + percent, TimeValue.timeValueNanos(time), interval, type.getTypeValue(), threadName)); + break; + } + + // for each snapshot (2nd array index) find later snapshot for same thread with max number of + // identical StackTraceElements (starting from end of each) + boolean[] done = new boolean[threadElementsSnapshotCount]; + for (int i = 0; i < threadElementsSnapshotCount; i++) { + if (done[i]) continue; + int maxSim = 1; + boolean[] similars = new boolean[threadElementsSnapshotCount]; + for (int j = i + 1; j < threadElementsSnapshotCount; j++) { + if (done[j]) continue; + int similarity = similarity(allInfos[i][t], allInfos[j][t]); + if (similarity > maxSim) { + maxSim = similarity; + similars = new boolean[threadElementsSnapshotCount]; + } + if (similarity == maxSim) similars[j] = true; } - // for each snapshot (2nd array index) find later snapshot for same thread with max number of - // identical StackTraceElements (starting from end of each) - boolean[] done = new boolean[threadElementsSnapshotCount]; - for (int i = 0; i < threadElementsSnapshotCount; i++) { - if (done[i]) continue; - int maxSim = 1; - boolean[] similars = new boolean[threadElementsSnapshotCount]; - for (int j = i + 1; j < threadElementsSnapshotCount; j++) { - if (done[j]) continue; - int similarity = similarity(allInfos[i][t], allInfos[j][t]); - if (similarity > maxSim) { - maxSim = similarity; - similars = new boolean[threadElementsSnapshotCount]; - } - if (similarity == maxSim) similars[j] = true; + // print out trace maxSim levels of i, and mark similar ones as done + int count = 1; + for (int j = i + 1; j < threadElementsSnapshotCount; j++) { + if (similars[j]) { + done[j] = true; + count++; } - // print out trace maxSim levels of i, and mark similar ones as done - int count = 1; - for (int j = i + 1; j < threadElementsSnapshotCount; j++) { - if (similars[j]) { - done[j] = true; - count++; + } + if (allInfos[i][t] != null) { + final StackTraceElement[] show = allInfos[i][t].getStackTrace(); + if (count == 1) { + sb.append(String.format(Locale.ROOT, " unique snapshot%n")); + for (StackTraceElement frame : show) { + sb.append(String.format(Locale.ROOT, " %s%n", frame)); } - } - if (allInfos[i][t] != null) { - final StackTraceElement[] show = allInfos[i][t].getStackTrace(); - if (count == 1) { - sb.append(String.format(Locale.ROOT, " unique snapshot%n")); - for (StackTraceElement frame : show) { - sb.append(String.format(Locale.ROOT, " %s%n", frame)); - } - } else { - sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", - count, threadElementsSnapshotCount, maxSim)); - for (int l = show.length - maxSim; l < show.length; l++) { - sb.append(String.format(Locale.ROOT, " %s%n", show[l])); - } + } else { + sb.append(String.format(Locale.ROOT, " %d/%d snapshots sharing following %d elements%n", + count, threadElementsSnapshotCount, maxSim)); + for (int l = show.length - maxSim; l < show.length; l++) { + sb.append(String.format(Locale.ROOT, " %s%n", show[l])); } } } } - - return sb.toString(); - } finally { - setThreadWaitBlockTimeMonitoringEnabled(threadBean, false); } + + return sb.toString(); } int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) { @@ -320,18 +390,24 @@ int similarity(ThreadInfo threadInfo, ThreadInfo threadInfo0) { static class ThreadTimeAccumulator { private final long threadId; + private final TimeValue interval; private long cpuTime; private long blockedTime; private long waitedTime; private long allocatedBytes; - ThreadTimeAccumulator(ThreadInfo info, long cpuTime, long allocatedBytes) { - this.blockedTime = info.getBlockedTime(); - this.waitedTime = info.getWaitedTime(); + ThreadTimeAccumulator(ThreadInfo info, TimeValue interval, long cpuTime, long allocatedBytes) { + this.blockedTime = millisecondsToNanos(info.getBlockedTime()); // Convert to nanos to standardize + this.waitedTime = millisecondsToNanos(info.getWaitedTime()); // Convert to nanos to standardize this.cpuTime = cpuTime; this.allocatedBytes = allocatedBytes; this.threadId = info.getThreadId(); + this.interval = interval; + } + + private long millisecondsToNanos(long millis) { + return millis * 1_000_000; } void subtractPrevious(ThreadTimeAccumulator previous) { @@ -354,6 +430,25 @@ public long getCpuTime() { return Math.max(cpuTime, 0); } + public long getRunnableTime() { + // If the thread didn't have any CPU movement, we can't really tell if it's + // not running, or it has been asleep forever. + if (getCpuTime() == 0) { + return 0; + } + return Math.max(interval.nanos() - getWaitedTime() - getBlockedTime(), 0); + } + + public long getOtherTime() { + // If the thread didn't have any CPU movement, we can't really tell if it's + // not running, or it has been asleep forever. + if (getCpuTime() == 0) { + return 0; + } + + return Math.max(getRunnableTime() - getCpuTime(), 0); + } + public long getBlockedTime() { return Math.max(blockedTime, 0); } @@ -384,4 +479,23 @@ static ToLongFunction valueGetterForReportType(ReportType throw new IllegalArgumentException("expected thread type to be either 'cpu', 'wait', 'mem', or 'block', but was " + type); } } + + @FunctionalInterface + public interface SleepFunction { + R apply(T t) throws InterruptedException; + } + + public static void initializeRuntimeMonitoring() { + // We'll let the JVM boot without this functionality, however certain APIs like HotThreads will not + // function and report an error. + if (ManagementFactory.getThreadMXBean().isThreadContentionMonitoringSupported() == false) { + LogManager.getLogger(HotThreads.class).info("Thread wait/blocked time accounting not supported."); + } else { + try { + ManagementFactory.getThreadMXBean().setThreadContentionMonitoringEnabled(true); + } catch (UnsupportedOperationException monitoringUnavailable) { + LogManager.getLogger(HotThreads.class).warn("Thread wait/blocked time accounting cannot be enabled."); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java index fb85bd481e556..8da1376905ea5 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java @@ -91,9 +91,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); NodesHotThreadsRequest nodesHotThreadsRequest = new NodesHotThreadsRequest(nodesIds); nodesHotThreadsRequest.threads(request.paramAsInt("threads", nodesHotThreadsRequest.threads())); - nodesHotThreadsRequest.ignoreIdleThreads(request.paramAsBoolean("ignore_idle_threads", nodesHotThreadsRequest.ignoreIdleThreads())); + nodesHotThreadsRequest.ignoreIdleThreads( + request.paramAsBoolean("ignore_idle_threads", nodesHotThreadsRequest.ignoreIdleThreads())); nodesHotThreadsRequest.type(HotThreads.ReportType.of(request.param("type", nodesHotThreadsRequest.type().getTypeValue()))); - nodesHotThreadsRequest.interval(TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval(), "interval")); + nodesHotThreadsRequest.sortOrder( + HotThreads.SortOrder.of(request.param("sort", nodesHotThreadsRequest.sortOrder().getOrderValue()))); + nodesHotThreadsRequest.interval( + TimeValue.parseTimeValue(request.param("interval"), nodesHotThreadsRequest.interval(), "interval")); nodesHotThreadsRequest.snapshots(request.paramAsInt("snapshots", nodesHotThreadsRequest.snapshots())); nodesHotThreadsRequest.timeout(request.param("timeout")); return channel -> client.admin().cluster().nodesHotThreads( diff --git a/server/src/main/resources/org/elasticsearch/bootstrap/security.policy b/server/src/main/resources/org/elasticsearch/bootstrap/security.policy index 85f36cf300b35..5ae15e74ec2d4 100644 --- a/server/src/main/resources/org/elasticsearch/bootstrap/security.policy +++ b/server/src/main/resources/org/elasticsearch/bootstrap/security.policy @@ -22,8 +22,6 @@ grant codeBase "${codebase.elasticsearch-secure-sm}" { grant codeBase "${codebase.elasticsearch}" { // needed for loading plugins which may expect the context class loader to be set permission java.lang.RuntimePermission "setContextClassLoader"; - // needed by HotThreads to enable wait/block time accounting on demand - permission java.lang.management.ManagementPermission "control"; }; //// Very special jar permissions: diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java index 590ce0abc4522..645bb3352f26d 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/hotthreads/NodesHotThreadsRequestTests.java @@ -55,7 +55,7 @@ public void testBWCSerialization() throws IOException { out.setVersion(previous); request.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(latest); + in.setVersion(previous); NodesHotThreadsRequest deserialized = new NodesHotThreadsRequest(in); assertEquals(request.threads(), deserialized.threads()); assertEquals(request.ignoreIdleThreads(), deserialized.ignoreIdleThreads()); diff --git a/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java b/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java index 68f536716f926..255fc09f1dbdf 100644 --- a/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/jvm/HotThreadsTests.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.stringContainsInOrder; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; @@ -32,6 +33,10 @@ public class HotThreadsTests extends ESTestCase { + private long nanosHelper(long millis) { + return millis * 1_000_000; + } + public void testSupportedThreadsReportType() { for (String type : new String[]{"unsupported", "", null, "CPU", "WAIT", "BLOCK", "MEM"}) { expectThrows(IllegalArgumentException.class, () -> new HotThreads().type(HotThreads.ReportType.of(type))); @@ -60,7 +65,8 @@ private List makeThreadStackHelper(List names) { public void testIdleThreadsDetection() { for (String threadName : new String[]{ - "Signal Dispatcher", "Finalizer", "Reference Handler", "Notification Thread", "Common-Cleaner", "process reaper"}) { + "Signal Dispatcher", "Finalizer", "Reference Handler", "Notification Thread", + "Common-Cleaner", "process reaper", "DestroyJavaVM"}) { ThreadInfo mockedThreadInfo = mock(ThreadInfo.class); when(mockedThreadInfo.getThreadName()).thenReturn(threadName); assertTrue(HotThreads.isKnownJDKThread(mockedThreadInfo)); @@ -196,7 +202,11 @@ class SimilarityTestCase { } private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long threadId) { - when(mockedMXBean.getThreadCpuTime(threadId)).thenReturn(0L).thenReturn(threadId); + return makeThreadInfoMocksHelper(mockedMXBean, threadId, 1L); + } + + private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long threadId, long cpuMultiplier) { + when(mockedMXBean.getThreadCpuTime(threadId)).thenReturn(0L).thenReturn(threadId * cpuMultiplier); ThreadInfo mockedThreadInfo = mock(ThreadInfo.class); when(mockedMXBean.getThreadInfo(eq(threadId), anyInt())).thenReturn(mockedThreadInfo); when(mockedThreadInfo.getThreadName()).thenReturn(String.format(Locale.ROOT, "Thread %d", threadId)); @@ -222,12 +232,16 @@ private ThreadInfo makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long thr return mockedThreadInfo; } - // We call this helper for each different mode to reset the before and after timings. private List makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long[] threadIds) { + return makeThreadInfoMocksHelper(mockedMXBean, threadIds, 1L); + } + + // We call this helper for each different mode to reset the before and after timings. + private List makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, long[] threadIds, long cpuMultiplier) { List allInfos = new ArrayList<>(threadIds.length); for (long threadId : threadIds) { - allInfos.add(makeThreadInfoMocksHelper(mockedMXBean, threadId)); + allInfos.add(makeThreadInfoMocksHelper(mockedMXBean, threadId, cpuMultiplier)); } when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(allInfos.toArray(new ThreadInfo[0])); @@ -236,110 +250,222 @@ private List makeThreadInfoMocksHelper(ThreadMXBean mockedMXBean, lo return allInfos; } - public void testInnerDetect() throws Exception { + private ThreadMXBean makeMockMXBeanHelper() { ThreadMXBean mockedMXBean = mock(ThreadMXBean.class); when(mockedMXBean.isThreadCpuTimeSupported()).thenReturn(true); + when(mockedMXBean.isThreadContentionMonitoringSupported()).thenReturn(true); + when(mockedMXBean.isThreadContentionMonitoringEnabled()).thenReturn(true); + return mockedMXBean; + } + + private SunThreadInfo makeMockSunThreadInfoHelper() { SunThreadInfo mockedSunThreadInfo = mock(SunThreadInfo.class); when(mockedSunThreadInfo.isThreadAllocatedMemorySupported()).thenReturn(true); when(mockedSunThreadInfo.isThreadAllocatedMemoryEnabled()).thenReturn(true); + return mockedSunThreadInfo; + } + + public void testInnerDetectCPUMode() throws Exception { + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); + SunThreadInfo mockedSunThreadInfo = makeMockSunThreadInfoHelper(); + long[] threadIds = new long[]{1, 2, 3, 4}; // Adds up to 10, the intervalNanos for calculating time percentages long mockCurrentThreadId = 0L; when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); - List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); - List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds, 100_000); + List cpuOrderedInfos = List.of(allInfos.get(0), allInfos.get(1), allInfos.get(2), allInfos.get(3)); when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); HotThreads hotThreads = new HotThreads() .busiestThreads(4) .type(HotThreads.ReportType.CPU) - .interval(TimeValue.timeValueNanos(10)) + .interval(TimeValue.timeValueMillis(10)) .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - String innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); + String innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); assertThat(innerResult, containsString("Hot threads at ")); - assertThat(innerResult, containsString("interval=10nanos, busiestThreads=4, ignoreIdleThreads=false:")); + assertThat(innerResult, containsString("interval=10ms, busiestThreads=4, ignoreIdleThreads=false:")); assertThat(innerResult, containsString("11/11 snapshots sharing following 2 elements")); - assertThat(innerResult, containsString("40.0% (4nanos out of 10nanos) cpu usage by thread 'Thread 4'")); - assertThat(innerResult, containsString("30.0% (3nanos out of 10nanos) cpu usage by thread 'Thread 3'")); - assertThat(innerResult, containsString("20.0% (2nanos out of 10nanos) cpu usage by thread 'Thread 2'")); - assertThat(innerResult, containsString("10.0% (1nanos out of 10nanos) cpu usage by thread 'Thread 1'")); + assertThat(innerResult, stringContainsInOrder( + "90.0% [cpu=1.0%, other=89.0%] (9ms out of 10ms) cpu usage by thread 'Thread 1'", + "80.0% [cpu=2.0%, other=78.0%] (8ms out of 10ms) cpu usage by thread 'Thread 2'", + "70.0% [cpu=3.0%, other=67.0%] (7ms out of 10ms) cpu usage by thread 'Thread 3'", + "60.0% [cpu=4.0%, other=56.0%] (6ms out of 10ms) cpu usage by thread 'Thread 4'")); assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); + // Let's ask again without progressing the CPU thread counters, e.g. resetting the mocks - innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); + innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(innerResult, containsString("0.0% [cpu=0.0%, other=0.0%] (0s out of 10ms) cpu usage by thread 'Thread 4'")); + assertThat(innerResult, containsString("0.0% [cpu=0.0%, other=0.0%] (0s out of 10ms) cpu usage by thread 'Thread 3'")); + assertThat(innerResult, containsString("0.0% [cpu=0.0%, other=0.0%] (0s out of 10ms) cpu usage by thread 'Thread 2'")); + assertThat(innerResult, containsString("0.0% [cpu=0.0%, other=0.0%] (0s out of 10ms) cpu usage by thread 'Thread 1'")); - assertThat(innerResult, containsString("0.0% (0s out of 10nanos) cpu usage by thread 'Thread 4'")); - assertThat(innerResult, containsString("0.0% (0s out of 10nanos) cpu usage by thread 'Thread 3'")); - assertThat(innerResult, containsString("0.0% (0s out of 10nanos) cpu usage by thread 'Thread 2'")); - assertThat(innerResult, containsString("0.0% (0s out of 10nanos) cpu usage by thread 'Thread 1'")); + // Test with the legacy sort order + allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds, 100_000); + cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); + when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); + + hotThreads = new HotThreads() + .busiestThreads(4) + .type(HotThreads.ReportType.CPU) + .interval(TimeValue.timeValueMillis(10)) + .sortOrder(HotThreads.SortOrder.CPU) + .threadElementsSnapshotCount(11) + .ignoreIdleThreads(false); + + innerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(innerResult, containsString("Hot threads at ")); + assertThat(innerResult, containsString("interval=10ms, busiestThreads=4, ignoreIdleThreads=false:")); + assertThat(innerResult, containsString("11/11 snapshots sharing following 2 elements")); + assertThat(innerResult, stringContainsInOrder( + "60.0% [cpu=4.0%, other=56.0%] (6ms out of 10ms) cpu usage by thread 'Thread 4'", + "70.0% [cpu=3.0%, other=67.0%] (7ms out of 10ms) cpu usage by thread 'Thread 3'", + "80.0% [cpu=2.0%, other=78.0%] (8ms out of 10ms) cpu usage by thread 'Thread 2'", + "90.0% [cpu=1.0%, other=89.0%] (9ms out of 10ms) cpu usage by thread 'Thread 1'")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); + assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); + } + + public void testInnerDetectWaitMode() throws Exception { + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); + SunThreadInfo mockedSunThreadInfo = makeMockSunThreadInfoHelper(); + + long[] threadIds = new long[]{1, 2, 3, 4}; // Adds up to 10, the intervalNanos for calculating time percentages + long mockCurrentThreadId = 0L; + when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); HotThreads hotWaitingThreads = new HotThreads() .busiestThreads(4) .type(HotThreads.ReportType.WAIT) - .interval(TimeValue.timeValueNanos(10)) + .interval(TimeValue.timeValueMillis(50)) .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); List waitOrderedInfos = List.of(allInfos.get(3), allInfos.get(1), allInfos.get(0), allInfos.get(2)); when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(waitOrderedInfos.toArray(new ThreadInfo[0])); - String waitInnerResult = hotWaitingThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); + String waitInnerResult = hotWaitingThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); - assertThat(waitInnerResult, containsString("40.0% (4nanos out of 10nanos) wait usage by thread 'Thread 4'")); - assertThat(waitInnerResult, containsString("20.0% (2nanos out of 10nanos) wait usage by thread 'Thread 2'")); - assertThat(waitInnerResult, containsString("0.0% (0s out of 10nanos) wait usage by thread 'Thread 1'")); - assertThat(waitInnerResult, containsString("0.0% (0s out of 10nanos) wait usage by thread 'Thread 3'")); + assertThat(waitInnerResult, stringContainsInOrder( + "8.0% (4ms out of 50ms) wait usage by thread 'Thread 4'", + "4.0% (2ms out of 50ms) wait usage by thread 'Thread 2'", + "0.0% (0s out of 50ms) wait usage by thread 'Thread 1'", + "0.0% (0s out of 50ms) wait usage by thread 'Thread 3'")); + + // Sort order has no impact on wait mode + hotWaitingThreads = new HotThreads() + .busiestThreads(4) + .type(HotThreads.ReportType.WAIT) + .sortOrder(HotThreads.SortOrder.CPU) + .interval(TimeValue.timeValueMillis(50)) + .threadElementsSnapshotCount(11) + .ignoreIdleThreads(false); + allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + waitOrderedInfos = List.of(allInfos.get(3), allInfos.get(1), allInfos.get(0), allInfos.get(2)); + when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(waitOrderedInfos.toArray(new ThreadInfo[0])); + + waitInnerResult = hotWaitingThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(waitInnerResult, stringContainsInOrder( + "8.0% (4ms out of 50ms) wait usage by thread 'Thread 4'", + "4.0% (2ms out of 50ms) wait usage by thread 'Thread 2'", + "0.0% (0s out of 50ms) wait usage by thread 'Thread 1'", + "0.0% (0s out of 50ms) wait usage by thread 'Thread 3'")); + } + + public void testInnerDetectBlockedMode() throws Exception { + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); + SunThreadInfo mockedSunThreadInfo = makeMockSunThreadInfoHelper(); + + long[] threadIds = new long[]{1, 2, 3, 4}; // Adds up to 10, the intervalNanos for calculating time percentages + long mockCurrentThreadId = 0L; + when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); HotThreads hotBlockedThreads = new HotThreads() .busiestThreads(4) .type(HotThreads.ReportType.BLOCK) - .interval(TimeValue.timeValueNanos(10)) + .interval(TimeValue.timeValueMillis(60)) .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); List blockOrderedInfos = List.of(allInfos.get(2), allInfos.get(0), allInfos.get(1), allInfos.get(3)); when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(blockOrderedInfos.toArray(new ThreadInfo[0])); - String blockInnerResult = hotBlockedThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); + String blockInnerResult = hotBlockedThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); - assertThat(blockInnerResult, containsString("30.0% (3nanos out of 10nanos) block usage by thread 'Thread 3'")); - assertThat(blockInnerResult, containsString("10.0% (1nanos out of 10nanos) block usage by thread 'Thread 1'")); - assertThat(blockInnerResult, containsString("0.0% (0s out of 10nanos) block usage by thread 'Thread 2'")); - assertThat(blockInnerResult, containsString("0.0% (0s out of 10nanos) block usage by thread 'Thread 4'")); + assertThat(blockInnerResult, stringContainsInOrder( + "5.0% (3ms out of 60ms) block usage by thread 'Thread 3'", + "1.7% (1ms out of 60ms) block usage by thread 'Thread 1'", + "0.0% (0s out of 60ms) block usage by thread 'Thread 2'", + "0.0% (0s out of 60ms) block usage by thread 'Thread 4'")); - // Test with only one stack to trigger the different print in innerDetect + // Sort order has no impact on block mode + hotBlockedThreads = new HotThreads() + .busiestThreads(4) + .type(HotThreads.ReportType.BLOCK) + .sortOrder(HotThreads.SortOrder.CPU) + .interval(TimeValue.timeValueMillis(60)) + .threadElementsSnapshotCount(11) + .ignoreIdleThreads(false); allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); - cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); + blockOrderedInfos = List.of(allInfos.get(2), allInfos.get(0), allInfos.get(1), allInfos.get(3)); + when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(blockOrderedInfos.toArray(new ThreadInfo[0])); + + blockInnerResult = hotBlockedThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(blockInnerResult, stringContainsInOrder( + "5.0% (3ms out of 60ms) block usage by thread 'Thread 3'", + "1.7% (1ms out of 60ms) block usage by thread 'Thread 1'", + "0.0% (0s out of 60ms) block usage by thread 'Thread 2'", + "0.0% (0s out of 60ms) block usage by thread 'Thread 4'")); + } + + public void testInnerDetectMemoryMode() throws Exception { + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); + SunThreadInfo mockedSunThreadInfo = makeMockSunThreadInfoHelper(); + + long[] threadIds = new long[]{1, 2, 3, 4}; // Adds up to 10, the intervalNanos for calculating time percentages + long mockCurrentThreadId = 0L; + when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); + + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); - hotThreads = new HotThreads() + for (long threadId : threadIds) { + when(mockedSunThreadInfo.getThreadAllocatedBytes(threadId)).thenReturn(0L).thenReturn(threadId*100); + } + + HotThreads hotThreads = new HotThreads() .busiestThreads(4) - .type(HotThreads.ReportType.CPU) + .type(HotThreads.ReportType.MEM) .interval(TimeValue.timeValueNanos(10)) .threadElementsSnapshotCount(1) .ignoreIdleThreads(false); - String singleResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); + String memInnerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + assertThat(memInnerResult, containsString(" unique snapshot")); + assertThat(memInnerResult, stringContainsInOrder( + "400b memory allocated by thread 'Thread 4'", + "300b memory allocated by thread 'Thread 3'", + "200b memory allocated by thread 'Thread 2'", + "100b memory allocated by thread 'Thread 1'")); - assertThat(singleResult, containsString(" unique snapshot")); - assertEquals(5, singleResult.split(" unique snapshot").length); - assertThat(singleResult, containsString("40.0% (4nanos out of 10nanos) cpu usage by thread 'Thread 4'")); - assertThat(singleResult, containsString("30.0% (3nanos out of 10nanos) cpu usage by thread 'Thread 3'")); - assertThat(singleResult, containsString("20.0% (2nanos out of 10nanos) cpu usage by thread 'Thread 2'")); - assertThat(singleResult, containsString("10.0% (1nanos out of 10nanos) cpu usage by thread 'Thread 1'")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); + // Sort order has no impact on memory mode allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); @@ -352,21 +478,55 @@ public void testInnerDetect() throws Exception { hotThreads = new HotThreads() .busiestThreads(4) .type(HotThreads.ReportType.MEM) + .sortOrder(HotThreads.SortOrder.CPU) .interval(TimeValue.timeValueNanos(10)) .threadElementsSnapshotCount(1) .ignoreIdleThreads(false); - String memInnerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); + memInnerResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); assertThat(memInnerResult, containsString(" unique snapshot")); - assertThat(memInnerResult, containsString("400b memory allocated by thread 'Thread 4'")); - assertThat(memInnerResult, containsString("300b memory allocated by thread 'Thread 3'")); - assertThat(memInnerResult, containsString("200b memory allocated by thread 'Thread 2'")); - assertThat(memInnerResult, containsString("100b memory allocated by thread 'Thread 1'")); + assertThat(memInnerResult, stringContainsInOrder( + "400b memory allocated by thread 'Thread 4'", + "300b memory allocated by thread 'Thread 3'", + "200b memory allocated by thread 'Thread 2'", + "100b memory allocated by thread 'Thread 1'")); + } + + public void testInnerDetectSingleSnapshot() throws Exception { + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); + SunThreadInfo mockedSunThreadInfo = makeMockSunThreadInfoHelper(); + + long[] threadIds = new long[]{1, 2, 3, 4}; // Adds up to 10, the intervalNanos for calculating time percentages + long mockCurrentThreadId = 0L; + when(mockedMXBean.getAllThreadIds()).thenReturn(threadIds); + + // Test with only one stack to trigger the different print in innerDetect + List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); + List cpuOrderedInfos = List.of(allInfos.get(3), allInfos.get(2), allInfos.get(1), allInfos.get(0)); + when(mockedMXBean.getThreadInfo(Matchers.any(), anyInt())).thenReturn(cpuOrderedInfos.toArray(new ThreadInfo[0])); + + HotThreads hotThreads = new HotThreads() + .busiestThreads(4) + .type(HotThreads.ReportType.CPU) + .interval(TimeValue.timeValueNanos(10)) + .threadElementsSnapshotCount(1) + .ignoreIdleThreads(false); + + String singleResult = hotThreads.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId, (interval) -> null); + + assertThat(singleResult, containsString(" unique snapshot")); + assertEquals(5, singleResult.split(" unique snapshot").length); + assertThat(singleResult, containsString("40.0% [cpu=40.0%, other=0.0%] (4nanos out of 10nanos) cpu usage by thread 'Thread 4'")); + assertThat(singleResult, containsString("30.0% [cpu=30.0%, other=0.0%] (3nanos out of 10nanos) cpu usage by thread 'Thread 3'")); + assertThat(singleResult, containsString("20.0% [cpu=20.0%, other=0.0%] (2nanos out of 10nanos) cpu usage by thread 'Thread 2'")); + assertThat(singleResult, containsString("10.0% [cpu=10.0%, other=0.0%] (1nanos out of 10nanos) cpu usage by thread 'Thread 1'")); + assertThat(singleResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); + assertThat(singleResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); + assertThat(singleResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); } public void testEnsureInnerDetectSkipsCurrentThread() throws Exception { - ThreadMXBean mockedMXBean = mock(ThreadMXBean.class); - when(mockedMXBean.isThreadCpuTimeSupported()).thenReturn(true); + ThreadMXBean mockedMXBean = makeMockMXBeanHelper(); long mockCurrentThreadId = 5L; long[] threadIds = new long[]{mockCurrentThreadId}; // Matches half the intervalNanos for calculating time percentages @@ -383,7 +543,7 @@ public void testEnsureInnerDetectSkipsCurrentThread() throws Exception { .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - String innerResult = hotThreads.innerDetect(mockedMXBean, mock(SunThreadInfo.class), mockCurrentThreadId); + String innerResult = hotThreads.innerDetect(mockedMXBean, mock(SunThreadInfo.class), mockCurrentThreadId, (interval) -> null); assertEquals(1, innerResult.lines().count()); } @@ -394,11 +554,14 @@ public void testReportTypeValueGetter() { when(mockedThreadInfo.getBlockedTime()).thenReturn(2L).thenReturn(0L); when(mockedThreadInfo.getWaitedTime()).thenReturn(3L).thenReturn(0L); - HotThreads.ThreadTimeAccumulator info = new HotThreads.ThreadTimeAccumulator(mockedThreadInfo, 1L, 4L); + HotThreads.ThreadTimeAccumulator info = new HotThreads.ThreadTimeAccumulator( + mockedThreadInfo, new TimeValue(10L), 1L, 4L); assertEquals(1L, HotThreads.ThreadTimeAccumulator.valueGetterForReportType(HotThreads.ReportType.CPU).applyAsLong(info)); - assertEquals(3L, HotThreads.ThreadTimeAccumulator.valueGetterForReportType(HotThreads.ReportType.WAIT).applyAsLong(info)); - assertEquals(2L, HotThreads.ThreadTimeAccumulator.valueGetterForReportType(HotThreads.ReportType.BLOCK).applyAsLong(info)); + assertEquals(nanosHelper(3L), + HotThreads.ThreadTimeAccumulator.valueGetterForReportType(HotThreads.ReportType.WAIT).applyAsLong(info)); + assertEquals(nanosHelper(2L), + HotThreads.ThreadTimeAccumulator.valueGetterForReportType(HotThreads.ReportType.BLOCK).applyAsLong(info)); assertEquals(4L, HotThreads.ThreadTimeAccumulator.valueGetterForReportType(HotThreads.ReportType.MEM).applyAsLong(info)); //Ensure all enum types have a report type getter @@ -438,7 +601,7 @@ public void testGetAllValidThreadInfos() { assertEquals(0, accumulator.getWaitedTime()); } - // Fake sleep, e.g don't sleep call the mock again + // Fake sleep, e.g. don't sleep call the mock again Map afterValidInfos = hotThreads.getAllValidThreadInfos(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); @@ -449,8 +612,8 @@ public void testGetAllValidThreadInfos() { boolean evenThreadId = ((threadId % 2) == 0); assertEquals(threadId, accumulator.getCpuTime()); - assertEquals((evenThreadId) ? 0 : threadId, accumulator.getBlockedTime()); - assertEquals((evenThreadId) ? threadId : 0, accumulator.getWaitedTime()); + assertEquals(nanosHelper((evenThreadId) ? 0 : threadId), accumulator.getBlockedTime()); + assertEquals(nanosHelper((evenThreadId) ? threadId : 0), accumulator.getWaitedTime()); } // Test when a thread has terminated during sleep, we don't report that thread @@ -476,8 +639,8 @@ public void testGetAllValidThreadInfos() { boolean evenThreadId = ((threadId % 2) == 0); assertEquals(threadId, accumulator.getCpuTime()); - assertEquals((evenThreadId) ? 0 : threadId, accumulator.getBlockedTime()); - assertEquals((evenThreadId) ? threadId : 0, accumulator.getWaitedTime()); + assertEquals(nanosHelper((evenThreadId) ? 0 : threadId), accumulator.getBlockedTime()); + assertEquals(nanosHelper((evenThreadId) ? threadId : 0), accumulator.getWaitedTime()); } // Test capturing timings for thread that launched while we were sleeping @@ -493,8 +656,8 @@ public void testGetAllValidThreadInfos() { HotThreads.ThreadTimeAccumulator firstAccumulator = afterValidInfos.get(removedInfo.getThreadId()); assertEquals(1, firstAccumulator.getCpuTime()); - assertEquals(0, firstAccumulator.getWaitedTime()); - assertEquals(1, firstAccumulator.getBlockedTime()); + assertEquals(nanosHelper(0), firstAccumulator.getWaitedTime()); + assertEquals(nanosHelper(1), firstAccumulator.getBlockedTime()); // Test skipping of current thread validInfos = hotThreads.getAllValidThreadInfos(mockedMXBean, mockedSunThreadInfo, threadIds[threadIds.length - 1]); @@ -528,7 +691,7 @@ public void testCaptureThreadStacks() throws InterruptedException { .threadElementsSnapshotCount(3) .ignoreIdleThreads(false); - // Setup the mocks + // Set up the mocks List allInfos = makeThreadInfoMocksHelper(mockedMXBean, threadIds); long[] topThreadIds = new long[]{threadIds[threadIds.length - 1], threadIds[threadIds.length - 2]}; @@ -555,17 +718,24 @@ public void testThreadInfoAccumulator() { ThreadInfo threadOne = makeThreadInfoMocksHelper(mockedMXBean, 1L); ThreadInfo threadTwo = makeThreadInfoMocksHelper(mockedMXBean, 2L); - HotThreads.ThreadTimeAccumulator acc = new HotThreads.ThreadTimeAccumulator(threadOne, 100L, 1000L); - HotThreads.ThreadTimeAccumulator accNext = new HotThreads.ThreadTimeAccumulator(threadOne, 250L, 2500L); + TimeValue maxTime = new TimeValue(1000L); + + HotThreads.ThreadTimeAccumulator acc = new HotThreads.ThreadTimeAccumulator( + threadOne, maxTime, 100L, 1000L); + HotThreads.ThreadTimeAccumulator accNext = new HotThreads.ThreadTimeAccumulator( + threadOne, maxTime, 250L, 2500L); accNext.subtractPrevious(acc); assertEquals(1500, accNext.getAllocatedBytes()); assertEquals(150, accNext.getCpuTime()); assertEquals(0, accNext.getWaitedTime()); - assertEquals(1, accNext.getBlockedTime()); + assertEquals(nanosHelper(1), accNext.getBlockedTime()); + assertEquals(nanosHelper(999), accNext.getRunnableTime()); - HotThreads.ThreadTimeAccumulator accNotMoving = new HotThreads.ThreadTimeAccumulator(threadOne, 250L, 2500L); - HotThreads.ThreadTimeAccumulator accNotMovingNext = new HotThreads.ThreadTimeAccumulator(threadOne, 250L, 2500L); + HotThreads.ThreadTimeAccumulator accNotMoving = new HotThreads.ThreadTimeAccumulator( + threadOne, maxTime, 250L, 2500L); + HotThreads.ThreadTimeAccumulator accNotMovingNext = new HotThreads.ThreadTimeAccumulator( + threadOne, maxTime, 250L, 2500L); accNotMovingNext.subtractPrevious(accNotMoving); @@ -573,9 +743,12 @@ public void testThreadInfoAccumulator() { assertEquals(0, accNotMovingNext.getCpuTime()); assertEquals(0, accNotMovingNext.getWaitedTime()); assertEquals(0, accNotMovingNext.getBlockedTime()); + assertEquals(0, accNotMovingNext.getRunnableTime()); - HotThreads.ThreadTimeAccumulator accOne = new HotThreads.ThreadTimeAccumulator(threadOne, 250L, 2500L); - HotThreads.ThreadTimeAccumulator accTwo = new HotThreads.ThreadTimeAccumulator(threadTwo, 350L, 3500L); + HotThreads.ThreadTimeAccumulator accOne = new HotThreads.ThreadTimeAccumulator( + threadOne, maxTime, 250L, 2500L); + HotThreads.ThreadTimeAccumulator accTwo = new HotThreads.ThreadTimeAccumulator( + threadTwo, maxTime, 350L, 3500L); expectThrows(IllegalStateException.class, () -> accTwo.subtractPrevious(accOne)); } @@ -600,51 +773,21 @@ public void testWaitBlockTimeMonitoringEnabled() throws Exception { .threadElementsSnapshotCount(11) .ignoreIdleThreads(false); - String innerResult = hotThreads.innerDetect(mockedMXBean, mock(SunThreadInfo.class), mockCurrentThreadId); - - assertThat(innerResult, containsString("Hot threads at ")); - assertThat(innerResult, containsString("interval=10nanos, busiestThreads=4, ignoreIdleThreads=false:")); - assertThat(innerResult, containsString("11/11 snapshots sharing following 2 elements")); - assertThat(innerResult, containsString("40.0% (4nanos out of 10nanos) cpu usage by thread 'Thread 4'")); - assertThat(innerResult, containsString("30.0% (3nanos out of 10nanos) cpu usage by thread 'Thread 3'")); - assertThat(innerResult, containsString("20.0% (2nanos out of 10nanos) cpu usage by thread 'Thread 2'")); - assertThat(innerResult, containsString("10.0% (1nanos out of 10nanos) cpu usage by thread 'Thread 1'")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); - - // Ensure we called the monitoring enabled with true and then with false - InOrder orderVerifier = inOrder(mockedMXBean); - orderVerifier.verify(mockedMXBean).setThreadContentionMonitoringEnabled(true); - orderVerifier.verify(mockedMXBean).setThreadContentionMonitoringEnabled(false); - } - - public void testWaitBlockTimeMonitoringEnabledWithException() { - ThreadMXBean mockedMXBean = mock(ThreadMXBean.class); - when(mockedMXBean.isThreadCpuTimeSupported()).thenReturn(true); - when(mockedMXBean.isThreadContentionMonitoringSupported()).thenReturn(true); - - when(mockedMXBean.getAllThreadIds()).thenThrow(new RuntimeException("Something bad happened")); - - HotThreads hotThreads = new HotThreads() - .busiestThreads(4) - .type(HotThreads.ReportType.CPU) - .interval(TimeValue.timeValueNanos(10)) - .threadElementsSnapshotCount(11) - .ignoreIdleThreads(false); - - expectThrows(RuntimeException.class, () -> hotThreads.innerDetect(mockedMXBean, mock(SunThreadInfo.class), 0L)); + Exception e = expectThrows(ElasticsearchException.class, () -> + hotThreads.innerDetect(mockedMXBean, mock(SunThreadInfo.class), mockCurrentThreadId, (interval) -> null)); + assertEquals(e.getMessage(), "thread wait/blocked time accounting is not supported on this JDK"); - // Ensure we called the monitoring enabled with true and then with false even with exception thrown + // Ensure we checked if JVM lock monitoring is enabled InOrder orderVerifier = inOrder(mockedMXBean); - orderVerifier.verify(mockedMXBean).setThreadContentionMonitoringEnabled(true); - orderVerifier.verify(mockedMXBean).setThreadContentionMonitoringEnabled(false); + orderVerifier.verify(mockedMXBean).isThreadContentionMonitoringSupported(); + orderVerifier.verify(mockedMXBean).isThreadContentionMonitoringEnabled(); } public void testGetThreadAllocatedBytesFailures() throws Exception { ThreadMXBean mockedMXBean = mock(ThreadMXBean.class); when(mockedMXBean.isThreadCpuTimeSupported()).thenReturn(true); when(mockedMXBean.isThreadContentionMonitoringSupported()).thenReturn(true); + when(mockedMXBean.isThreadContentionMonitoringEnabled()).thenReturn(true); SunThreadInfo mockedSunThreadInfo = mock(SunThreadInfo.class); when(mockedSunThreadInfo.isThreadAllocatedMemorySupported()).thenReturn(false); @@ -665,26 +808,7 @@ public void testGetThreadAllocatedBytesFailures() throws Exception { .ignoreIdleThreads(false); ElasticsearchException exception = expectThrows(ElasticsearchException.class, - () -> hotThreads0.innerDetect(mockedMXBean, mockedSunThreadInfo, 0L)); + () -> hotThreads0.innerDetect(mockedMXBean, mockedSunThreadInfo, 0L, (interval) -> null)); assertThat(exception.getMessage(), equalTo("thread allocated memory is not supported on this JDK")); - - // making sure CPU type was not affected when isThreadAllocatedMemorySupported() == false - - HotThreads hotThreads1 = new HotThreads() - .busiestThreads(4) - .type(HotThreads.ReportType.CPU) - .interval(TimeValue.timeValueNanos(10)) - .threadElementsSnapshotCount(1) - .ignoreIdleThreads(false); - - String innerResult = hotThreads1.innerDetect(mockedMXBean, mockedSunThreadInfo, mockCurrentThreadId); - assertThat(innerResult, containsString("Hot threads at ")); - assertThat(innerResult, containsString("40.0% (4nanos out of 10nanos) cpu usage by thread 'Thread 4'")); - assertThat(innerResult, containsString("30.0% (3nanos out of 10nanos) cpu usage by thread 'Thread 3'")); - assertThat(innerResult, containsString("20.0% (2nanos out of 10nanos) cpu usage by thread 'Thread 2'")); - assertThat(innerResult, containsString("10.0% (1nanos out of 10nanos) cpu usage by thread 'Thread 1'")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_0(Some_File:1)")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.test.method_1(Some_File:1)")); - assertThat(innerResult, containsString("org.elasticsearch.monitor.testOther.methodFinal(Some_File:1)")); } }