Skip to content

Commit

Permalink
Dump Stacktrace on Slow IO-Thread Operations (#42000) (#42572)
Browse files Browse the repository at this point in the history
* Dump Stacktrace on Slow IO-Thread Operations

* Follow up to #39729 extending the functionality to actually dump the
stack when the thread is blocked not afterwards
   * Logging the stacktrace after the thread became unblocked is only of
limited use because we don't know what happened in the slow callback
from that (only whether we were  blocked on a read,write,connect etc.)
* Relates #41745
  • Loading branch information
original-brownbear authored and Christoph Büscher committed Jun 13, 2019
1 parent 234ec0c commit 44f2db8
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.BytesChannelContext;
Expand All @@ -57,11 +58,16 @@
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
Expand All @@ -70,13 +76,15 @@ public class MockNioTransport extends TcpTransport {
private static final Logger logger = LogManager.getLogger(MockNioTransport.class);

private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final TransportThreadWatchdog transportThreadWatchdog;
private volatile NioSelectorGroup nioGroup;
private volatile MockTcpChannelFactory clientChannelFactory;

public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
this.transportThreadWatchdog = new TransportThreadWatchdog(threadPool);
}

@Override
Expand All @@ -96,7 +104,7 @@ protected void doStart() {
boolean success = false;
try {
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));
(s) -> new TestEventHandler(this::onNonChannelException, s, transportThreadWatchdog));

ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");
Expand Down Expand Up @@ -125,6 +133,7 @@ protected void doStart() {
@Override
protected void stopInternal() {
try {
transportThreadWatchdog.stop();
nioGroup.close();
} catch (Exception e) {
logger.warn("unexpected exception while stopping nio group", e);
Expand Down Expand Up @@ -319,4 +328,64 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
}
}

static final class TransportThreadWatchdog {

private static final long WARN_THRESHOLD = TimeUnit.MILLISECONDS.toNanos(150);

// Only check every 2s to not flood the logs on a blocked thread.
// We mostly care about long blocks and not random slowness anyway and in tests would randomly catch slow operations that block for
// less than 2s eventually.
private static final TimeValue CHECK_INTERVAL = TimeValue.timeValueSeconds(2);

private final ThreadPool threadPool;
private final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();

private volatile boolean stopped;

TransportThreadWatchdog(ThreadPool threadPool) {
this.threadPool = threadPool;
threadPool.schedule(this::logLongRunningExecutions, CHECK_INTERVAL, ThreadPool.Names.GENERIC);
}

public boolean register() {
Long previousValue = registry.put(Thread.currentThread(), threadPool.relativeTimeInNanos());
return previousValue == null;
}

public void unregister() {
Long previousValue = registry.remove(Thread.currentThread());
assert previousValue != null;
maybeLogElapsedTime(previousValue);
}

private void maybeLogElapsedTime(long startTime) {
long elapsedTime = threadPool.relativeTimeInNanos() - startTime;
if (elapsedTime > WARN_THRESHOLD) {
logger.warn(
new ParameterizedMessage("Slow execution on network thread [{} milliseconds]",
TimeUnit.NANOSECONDS.toMillis(elapsedTime)),
new RuntimeException("Slow exception on network thread"));
}
}

private void logLongRunningExecutions() {
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
final long elapsedTime = threadPool.relativeTimeInMillis() - entry.getValue();
if (elapsedTime > WARN_THRESHOLD) {
final Thread thread = entry.getKey();
logger.warn("Slow execution on network thread [{}] [{} milliseconds]: \n{}", thread.getName(),
TimeUnit.NANOSECONDS.toMillis(elapsedTime),
Arrays.stream(thread.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n")));
}
}
if (stopped == false) {
threadPool.scheduleUnlessShuttingDown(CHECK_INTERVAL, ThreadPool.Names.GENERIC, this::logLongRunningExecutions);
}
}

public void stop() {
stopped = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

package org.elasticsearch.transport.nio;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.nio.ChannelContext;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.NioSelector;
Expand All @@ -32,185 +29,202 @@
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class TestEventHandler extends EventHandler {

private static final Logger logger = LogManager.getLogger(TestEventHandler.class);

private final Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
private final Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
private final LongSupplier relativeNanosSupplier;
private final MockNioTransport.TransportThreadWatchdog transportThreadWatchdog;

TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, LongSupplier relativeNanosSupplier) {
TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier,
MockNioTransport.TransportThreadWatchdog transportThreadWatchdog) {
super(exceptionHandler, selectorSupplier);
this.relativeNanosSupplier = relativeNanosSupplier;
this.transportThreadWatchdog = transportThreadWatchdog;
}

@Override
protected void acceptChannel(ServerChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.acceptChannel(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void acceptException(ServerChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.acceptException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleRegistration(ChannelContext<?> context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleRegistration(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void registrationException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.registrationException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

public void handleConnect(SocketChannelContext context) throws IOException {
assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleConnect(context);
if (context.isConnectComplete()) {
hasConnectedMap.add(context);
}
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

public void connectException(SocketChannelContext context, Exception e) {
assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
final boolean registered = transportThreadWatchdog.register();
hasConnectExceptionMap.add(context);
long startTime = relativeNanosSupplier.getAsLong();
try {
super.connectException(context, e);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleRead(SocketChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleRead(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void readException(SocketChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.readException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleWrite(SocketChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleWrite(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void writeException(SocketChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.writeException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleTask(Runnable task) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleTask(task);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void taskException(Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.taskException(exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleClose(ChannelContext<?> context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleClose(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void closeException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.closeException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.genericChannelException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}

private static final long WARN_THRESHOLD = 150;

private void maybeLogElapsedTime(long startTime) {
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime);
if (elapsedTime > WARN_THRESHOLD) {
logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime),
new RuntimeException("Slow exception on network thread"));
if (registered) {
transportThreadWatchdog.unregister();
}
}
}
}
Loading

0 comments on commit 44f2db8

Please sign in to comment.