Skip to content

Commit

Permalink
Log exceptions in TcpTransport at DEBUG level (elastic#51612)
Browse files Browse the repository at this point in the history
When running Elasticsearch on a flaky network, we may see nodes leaving the
cluster with reason `disconnected`. It may be useful to the cluster
administrator to see the full exception that caused the disconnection, but this
is only available with `TRACE` level logging which commingles the details of
the problem with other messages that are not useful to end users.

This commit promotes logging of exceptions in `TcpTransport` from `TRACE` to
`DEBUG` to separate them from the truly `TRACE`-level messages.
  • Loading branch information
DaveCTurner committed Jan 31, 2020
1 parent 77b00fc commit 72ae0ca
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 6 deletions.
18 changes: 12 additions & 6 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.MeanMetric;
Expand Down Expand Up @@ -577,27 +578,32 @@ protected final void doStop() {
}

public void onException(TcpChannel channel, Exception e) {
handleException(channel, e, lifecycle, outboundHandler);
}

// exposed for tests
static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle, OutboundHandler outboundHandler) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
return;
}

if (isCloseConnectionException(e)) {
logger.trace(() -> new ParameterizedMessage(
logger.debug(() -> new ParameterizedMessage(
"close connection exception caught on transport layer [{}], disconnecting from relevant node", channel), e);
// close the channel, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
} else if (isConnectException(e)) {
logger.trace(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e);
logger.debug(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
} else if (e instanceof BindException) {
logger.trace(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e);
logger.debug(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
} else if (e instanceof CancelledKeyException) {
logger.trace(() -> new ParameterizedMessage(
logger.debug(() -> new ParameterizedMessage(
"cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e);
// close the channel as safe measure, which will cause a node to be disconnected if relevant
CloseableChannel.closeChannel(channel);
Expand All @@ -619,7 +625,7 @@ public void onException(TcpChannel channel, Exception e) {

protected void onServerException(TcpServerChannel channel, Exception e) {
if (e instanceof BindException) {
logger.trace(() -> new ParameterizedMessage("bind exception from server channel caught on transport layer [{}]", channel), e);
logger.debug(() -> new ParameterizedMessage("bind exception from server channel caught on transport layer [{}]", channel), e);
} else {
logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
}
Expand Down Expand Up @@ -817,7 +823,7 @@ private static boolean bufferStartsWith(BytesReference buffer, String method) {
*/
public static class HttpRequestOnTransportException extends ElasticsearchException {

private HttpRequestOnTransportException(String msg) {
HttpRequestOnTransportException(String msg) {
super(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,41 @@

package org.elasticsearch.transport;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;

import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;

/** Unit tests for {@link TcpTransport} */
Expand Down Expand Up @@ -354,4 +369,83 @@ public void testHTTPResponse() throws IOException {
"(not HTTP port) of a remote node is specified in the configuration", ex.getMessage());
}
}

@TestLogging(reason = "testing logging", value = "org.elasticsearch.transport.TcpTransport:DEBUG")
public void testExceptionHandling() throws IllegalAccessException {
testExceptionHandling(false, new ElasticsearchException("simulated"), true,
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.ERROR, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.WARN, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.INFO, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.DEBUG, "*"));
testExceptionHandling(new ElasticsearchException("simulated"),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.WARN, "exception caught on transport layer [*], closing connection"));
testExceptionHandling(new ClosedChannelException(),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "close connection exception caught on transport layer [*], disconnecting from relevant node"));
testExceptionHandling(new ElasticsearchException("Connection reset"),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "close connection exception caught on transport layer [*], disconnecting from relevant node"));
testExceptionHandling(new BindException(),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "bind exception caught on transport layer [*]"));
testExceptionHandling(new CancelledKeyException(),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.DEBUG, "cancelled key exception caught on transport layer [*], disconnecting from relevant node"));
testExceptionHandling(true, new TcpTransport.HttpRequestOnTransportException("test"), false,
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.ERROR, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.WARN, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.INFO, "*"),
new MockLogAppender.UnseenEventExpectation("message", "org.elasticsearch.transport.TcpTransport", Level.DEBUG, "*"));
testExceptionHandling(new StreamCorruptedException("simulated"),
new MockLogAppender.SeenEventExpectation("message", "org.elasticsearch.transport.TcpTransport",
Level.WARN, "simulated, [*], closing connection"));
}

private void testExceptionHandling(Exception exception,
MockLogAppender.LoggingExpectation... expectations) throws IllegalAccessException {
testExceptionHandling(true, exception, true, expectations);
}

private void testExceptionHandling(boolean startTransport, Exception exception, boolean expectClosed,
MockLogAppender.LoggingExpectation... expectations) throws IllegalAccessException {
final TestThreadPool testThreadPool = new TestThreadPool("test");
MockLogAppender appender = new MockLogAppender();

try {
appender.start();

Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender);
for (MockLogAppender.LoggingExpectation expectation : expectations) {
appender.addExpectation(expectation);
}

final Lifecycle lifecycle = new Lifecycle();

if (startTransport) {
lifecycle.moveToStarted();
}

final FakeTcpChannel channel = new FakeTcpChannel();
final PlainActionFuture<Void> listener = new PlainActionFuture<>();
channel.addCloseListener(listener);

TcpTransport.handleException(channel, exception, lifecycle,
new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, testThreadPool, BigArrays.NON_RECYCLING_INSTANCE));

if (expectClosed) {
assertTrue(listener.isDone());
assertThat(listener.actionGet(), nullValue());
} else {
assertFalse(listener.isDone());
}

appender.assertAllExpectationsMatched();

} finally {
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
}
}
}

0 comments on commit 72ae0ca

Please sign in to comment.