Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump Stacktrace on Slow IO-Thread Operations #42000

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.BytesChannelContext;
Expand All @@ -57,11 +58,16 @@
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
Expand All @@ -70,13 +76,15 @@ public class MockNioTransport extends TcpTransport {
private static final Logger logger = LogManager.getLogger(MockNioTransport.class);

private final ConcurrentMap<String, MockTcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final TransportThreadWatchdog transportThreadWatchdog;
private volatile NioSelectorGroup nioGroup;
private volatile MockTcpChannelFactory clientChannelFactory;

public MockNioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
this.transportThreadWatchdog = new TransportThreadWatchdog(threadPool);
}

@Override
Expand All @@ -96,7 +104,7 @@ protected void doStart() {
boolean success = false;
try {
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));
(s) -> new TestEventHandler(this::onNonChannelException, s, transportThreadWatchdog));

ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");
Expand Down Expand Up @@ -125,6 +133,7 @@ protected void doStart() {
@Override
protected void stopInternal() {
try {
transportThreadWatchdog.stop();
nioGroup.close();
} catch (Exception e) {
logger.warn("unexpected exception while stopping nio group", e);
Expand Down Expand Up @@ -311,4 +320,64 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
}
}

static final class TransportThreadWatchdog {

private static final long WARN_THRESHOLD = TimeUnit.MILLISECONDS.toNanos(150);

// Only check every 2s to not flood the logs on a blocked thread.
// We mostly care about long blocks and not random slowness anyway and in tests would randomly catch slow operations that block for
// less than 2s eventually.
private static final TimeValue CHECK_INTERVAL = TimeValue.timeValueSeconds(2);

private final ThreadPool threadPool;
private final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();

private volatile boolean stopped;

TransportThreadWatchdog(ThreadPool threadPool) {
this.threadPool = threadPool;
threadPool.schedule(this::logLongRunningExecutions, CHECK_INTERVAL, ThreadPool.Names.GENERIC);
}

public boolean register() {
Long previousValue = registry.put(Thread.currentThread(), threadPool.relativeTimeInNanos());
return previousValue == null;
}

public void unregister() {
Long previousValue = registry.remove(Thread.currentThread());
assert previousValue != null;
maybeLogElapsedTime(previousValue);
}

private void maybeLogElapsedTime(long startTime) {
long elapsedTime = threadPool.relativeTimeInNanos() - startTime;
if (elapsedTime > WARN_THRESHOLD) {
logger.warn(
new ParameterizedMessage("Slow execution on network thread [{} milliseconds]",
TimeUnit.NANOSECONDS.toMillis(elapsedTime)),
new RuntimeException("Slow exception on network thread"));
}
}

private void logLongRunningExecutions() {
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
final long elapsedTime = threadPool.relativeTimeInMillis() - entry.getValue();
if (elapsedTime > WARN_THRESHOLD) {
final Thread thread = entry.getKey();
logger.warn("Slow execution on network thread [{}] [{} milliseconds]: \n{}", thread.getName(),
TimeUnit.NANOSECONDS.toMillis(elapsedTime),
Arrays.stream(thread.getStackTrace()).map(Object::toString).collect(Collectors.joining("\n")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this not cause issues with the security manager to call getStackTrace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because we grant

  // needed for top threads handling
  permission org.elasticsearch.secure_sm.ThreadPermission "modifyArbitraryThreadGroup";

to codeBase "${codebase.randomizedtesting-runner}" { right? (in test-framework.policy)

We actually do use the same code to get stack traces in other tests too so I'm sure it works fine with the SM (+ manually verified it).

}
}
if (stopped == false) {
threadPool.schedule(this::logLongRunningExecutions, CHECK_INTERVAL, ThreadPool.Names.GENERIC);
}
}

public void stop() {
stopped = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

package org.elasticsearch.transport.nio;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.nio.ChannelContext;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.NioSelector;
Expand All @@ -32,185 +29,202 @@
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class TestEventHandler extends EventHandler {

private static final Logger logger = LogManager.getLogger(TestEventHandler.class);

private final Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
private final Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
private final LongSupplier relativeNanosSupplier;
private final MockNioTransport.TransportThreadWatchdog transportThreadWatchdog;

TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, LongSupplier relativeNanosSupplier) {
TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier,
MockNioTransport.TransportThreadWatchdog transportThreadWatchdog) {
super(exceptionHandler, selectorSupplier);
this.relativeNanosSupplier = relativeNanosSupplier;
this.transportThreadWatchdog = transportThreadWatchdog;
}

@Override
protected void acceptChannel(ServerChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.acceptChannel(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void acceptException(ServerChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.acceptException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleRegistration(ChannelContext<?> context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleRegistration(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void registrationException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.registrationException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

public void handleConnect(SocketChannelContext context) throws IOException {
assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleConnect(context);
if (context.isConnectComplete()) {
hasConnectedMap.add(context);
}
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

public void connectException(SocketChannelContext context, Exception e) {
assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
final boolean registered = transportThreadWatchdog.register();
hasConnectExceptionMap.add(context);
long startTime = relativeNanosSupplier.getAsLong();
try {
super.connectException(context, e);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleRead(SocketChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleRead(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void readException(SocketChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.readException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleWrite(SocketChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleWrite(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void writeException(SocketChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.writeException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleTask(Runnable task) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleTask(task);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void taskException(Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.taskException(exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void handleClose(ChannelContext<?> context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.handleClose(context);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void closeException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.closeException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
if (registered) {
transportThreadWatchdog.unregister();
}
}
}

@Override
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
final boolean registered = transportThreadWatchdog.register();
try {
super.genericChannelException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}

private static final long WARN_THRESHOLD = 150;

private void maybeLogElapsedTime(long startTime) {
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime);
if (elapsedTime > WARN_THRESHOLD) {
logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime),
new RuntimeException("Slow exception on network thread"));
if (registered) {
transportThreadWatchdog.unregister();
}
}
}
}
Loading