Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add functionality for an automatic thread dump upon thread pool exhaustion #137

Open
wants to merge 2 commits into
base: 2.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -278,6 +279,21 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
*/
volatile Runnable terminationTask;

/**
* If the thread pool has reached its max and generated a thread dump.
*/
private AtomicBoolean reachedMax = new AtomicBoolean();

/**
* If a WARN is allowed upon thread exhaustion.
*/
private boolean allowExhaustionWarn;

/**
* If a dump is allowed upon thread exhaustion.
*/
private boolean allowExhaustionDump;

// =======================================================
// Statistics fields and counters
// =======================================================
Expand Down Expand Up @@ -379,6 +395,8 @@ public final class EnhancedQueueExecutor extends AbstractExecutorService impleme
head = tail = new TaskNode(null);
// thread stat
threadStatus = withCoreSize(withMaxSize(withAllowCoreTimeout(0L, builder.allowsCoreThreadTimeOut()), maxSize), coreSize);
withAllowExhaustionWarn(builder.allowsExhaustionWarn());
withAllowExhaustionDump(builder.allowsExhaustionDump());
timeoutNanos = max(1L, keepAliveTime);
queueSize = withMaxQueueSize(withCurrentQueueSize(0L, 0), builder.getMaximumQueueSize());
mxBean = new MXBeanImpl();
Expand Down Expand Up @@ -421,6 +439,8 @@ public static final class Builder {
private TimeUnit keepAliveUnits = TimeUnit.SECONDS;
private float growthResistance;
private boolean allowCoreTimeOut;
private boolean allowExhaustionWarn;
private boolean allowExhaustionDump;
private int maxQueueSize = Integer.MAX_VALUE;
private boolean registerMBean = REGISTER_MBEAN;
private String mBeanName;
Expand Down Expand Up @@ -607,6 +627,50 @@ public Builder allowCoreThreadTimeOut(final boolean allowCoreTimeOut) {
return this;
}

/**
* Determine whether a WARN logged upon thread exhaustion is allowed.
*
* @return {@code true} if a WARN upon thread exhaustion is allowed, {@code false} otherwise
* @see EnhancedQueueExecutor#allowsExhaustionWarn()
*/
public boolean allowsExhaustionWarn() {
return allowExhaustionWarn;
}

/**
* Establish whether a WARN logged upon thread exhaustion is allowed.
*
* @param allowExhaustionWarn {@code true} if a WARN upon thread exhaustion is allowed, {@code false} otherwise
* @return this builder
* @see EnhancedQueueExecutor#allowExhaustionWarn(boolean)
*/
public Builder allowExhaustionWarn(final boolean allowExhaustionWarn) {
this.allowExhaustionWarn = allowExhaustionWarn;
return this;
}

/**
* Determine whether a thread dump logged upon thread exhaustion is allowed.
*
* @return {@code true} if a thread dump upon thread exhaustion is allowed, {@code false} otherwise
* @see EnhancedQueueExecutor#allowsExhaustionDump()
*/
public boolean allowsExhaustionDump() {
return allowExhaustionDump;
}

/**
* Establish whether a thread dump logged upon thread exhaustion is allowed.
*
* @param allowExhaustionDump {@code true} if a thread dump upon thread exhaustion is allowed, {@code false} otherwise
* @return this builder
* @see EnhancedQueueExecutor#allowExhaustionDump(boolean)
*/
public Builder allowExhaustionDump(final boolean allowExhaustionDump) {
this.allowExhaustionDump = allowExhaustionDump;
return this;
}

/**
* Get the maximum queue size. If the queue is full and a task cannot be immediately accepted, rejection will result.
*
Expand Down Expand Up @@ -1514,6 +1578,9 @@ int tryAllocateThread(final float growthResistance) {
oldSize = currentSizeOf(oldStat);
if (oldSize >= maxSizeOf(oldStat)) {
// max threads already reached
if (reachedMax.compareAndSet(false, true) && isAllowExhaustionWarn()) {
ThreadDumpUtil.handleExhaustion(maxSizeOf(oldStat), isAllowExhaustionDump());
}
return AT_NO;
}
if (oldSize >= coreSizeOf(oldStat) && oldSize > 0) {
Expand Down Expand Up @@ -1935,6 +2002,14 @@ static long withShutdownInterrupt(final long status) {
return status | TS_SHUTDOWN_INTERRUPT;
}

private boolean withAllowExhaustionWarn(final boolean allowed) {
return allowExhaustionWarn = allowed;
}

private boolean withAllowExhaustionDump(final boolean allowed) {
return allowExhaustionDump = allowed;
}

static long withAllowCoreTimeout(final long status, final boolean allowed) {
return allowed ? status | TS_ALLOW_CORE_TIMEOUT : status & ~TS_ALLOW_CORE_TIMEOUT;
}
Expand All @@ -1955,6 +2030,14 @@ static boolean isAllowCoreTimeout(final long oldVal) {
return (oldVal & TS_ALLOW_CORE_TIMEOUT) != 0;
}

private boolean isAllowExhaustionWarn() {
return allowExhaustionWarn;
}

private boolean isAllowExhaustionDump() {
return allowExhaustionDump;
}

// =======================================================
// Static configuration
// =======================================================
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/jboss/threads/Messages.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ interface Messages extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
void taskSubmitFailed(@Cause RejectedExecutionException e, Runnable task);

@Message(id = 15, value = "%s")
@LogMessage(level = Logger.Level.WARN)
void exhaustedPoolMessage(String msg);

@Message(id = 16, value = "Exception thrown during generation of thread dump")
@LogMessage(level = Logger.Level.WARN)
void threadDumpException(@Cause Exception cause);

// validation

@Message(id = 100, value = "Keep-alive may only be set to 0 for this executor type")
Expand Down
131 changes: 131 additions & 0 deletions src/main/java/org/jboss/threads/ThreadDumpUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* JBoss, Home of Professional Open Source.
*
* Copyright 2022 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jboss.threads;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.LockInfo;
import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.StringBuilder;

final class ThreadDumpUtil {

public static void handleExhaustion(int maxPoolSize, boolean allowDump) {
StringBuilder msg = new StringBuilder();
msg.append(String.format("Thread pool has reached %d max threads in use. Performance may be impacted.", maxPoolSize));
if (allowDump) {
threadDump(msg);
}
Messages.msg.exhaustedPoolMessage(msg.toString());
}

public static void threadDump(StringBuilder sb) {
try {
sb.append(" Thread dump:\n" +
"*******************************************************************************\n");
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo threadInfo : threadMXBean.dumpAllThreads(true, true)) {
sb.append(threadInfoToString(threadInfo));
}
sb.append("\n===============================================================================\n" +
"End Thread dump\n*******************************************************************************\n");

long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null && deadlockedThreads.length > 0) {
sb.append("Deadlock detected!\n"+
"*******************************************************************************\n" +
"{0}\n===============================================================================\n" +
"End Deadlock\n*******************************************************************************\n");
for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(deadlockedThreads, true, true)) {
sb.append(threadInfoToString(threadInfo));
}
}
} catch (Exception e) {
Messages.msg.threadDumpException(e);
}
}

private static String threadInfoToString(ThreadInfo threadInfo) {
StringBuilder sb = new StringBuilder("\"" + threadInfo.getThreadName() + "\"" +
" Id=" + threadInfo.getThreadId() + " " +
threadInfo.getThreadState());
if (threadInfo.getLockName() != null) {
sb.append(" on " + threadInfo.getLockName());
}
if (threadInfo.getLockOwnerName() != null) {
sb.append(" owned by \"" + threadInfo.getLockOwnerName() +
"\" Id=" + threadInfo.getLockOwnerId());
}
if (threadInfo.isSuspended()) {
sb.append(" (suspended)");
}
if (threadInfo.isInNative()) {
sb.append(" (in native)");
}
sb.append('\n');
int i = 0;
for (; i < threadInfo.getStackTrace().length; i++) {
StackTraceElement ste = threadInfo.getStackTrace()[i];
sb.append("\tat " + ste.toString());
sb.append('\n');
if (i == 0 && threadInfo.getLockInfo() != null) {
Thread.State ts = threadInfo.getThreadState();
switch (ts) {
case BLOCKED:
sb.append("\t- blocked on " + threadInfo.getLockInfo());
sb.append('\n');
break;
case WAITING:
sb.append("\t- waiting on " + threadInfo.getLockInfo());
sb.append('\n');
break;
case TIMED_WAITING:
sb.append("\t- waiting on " + threadInfo.getLockInfo());
sb.append('\n');
break;
default:
}
}

for (MonitorInfo mi : threadInfo.getLockedMonitors()) {
if (mi.getLockedStackDepth() == i) {
sb.append("\t- locked " + mi);
sb.append('\n');
}
}
}

LockInfo[] locks = threadInfo.getLockedSynchronizers();
if (locks.length > 0) {
sb.append("\n\tNumber of locked synchronizers = " + locks.length);
sb.append('\n');
for (LockInfo li : locks) {
sb.append("\t- " + li);
sb.append('\n');
}
}
sb.append('\n');
return sb.toString();
}

}