Skip to content

Commit

Permalink
Invoke response handler on failure to send (#53631)
Browse files Browse the repository at this point in the history
Today it can happen that a transport message fails to send (for example,
because a transport interceptor rejects the request). In this case, the
response handler is never invoked, which can lead to necessary cleanups
not being performed. There are two ways to handle this. One is to expect
every callsite that sends a message to try/catch these exceptions and
handle them appropriately. The other is merely to invoke the response
handler to handle the exception, which is already equipped to handle
transport exceptions.
  • Loading branch information
jasontedor committed Mar 17, 2020
1 parent 01af65d commit 4e3d7ee
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportSettings;

import java.net.InetAddress;
Expand All @@ -46,12 +47,14 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;

public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {

public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings, boolean doHandshake) {
ClusterSettings clusterSettings, boolean doHandshake,
TransportInterceptor interceptor) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
Expand All @@ -66,16 +69,28 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP
}
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService mockTransportService = MockTransportService.createNewService(
settings,
transport,
version,
threadPool,
clusterSettings,
Collections.emptySet(),
interceptor);
mockTransportService.start();
return mockTransportService;
}

@Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
protected MockTransportService build(
Settings settings,
Version version,
ClusterSettings clusterSettings,
boolean doHandshake,
TransportInterceptor interceptor) {
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
MockTransportService transportService =
nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake, interceptor);
transportService.start();
return transportService;
}
Expand Down Expand Up @@ -103,7 +118,7 @@ public void testBindUnavailableAddress() {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
MockTransportService transportService =
nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true, NOOP_TRANSPORT_INTERCEPTOR);
try {
transportService.start();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -512,37 +511,57 @@ public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryN
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
}

public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, options, handler);
}

/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
try {
asyncSender.sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}

Expand All @@ -562,13 +581,15 @@ public final <T extends TransportResponse> void sendChildRequest(final Discovery
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendChildRequest(connection, action, request, parentTask, options, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendChildRequest(connection, action, request, parentTask, options, handler);
}

public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
Expand All @@ -582,16 +603,7 @@ public <T extends TransportResponse> void sendChildRequest(final Transport.Conne
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
sendRequest(connection, action, request, options, handler);
} catch (TaskCancelledException ex) {
// The parent task is already cancelled - just fail the request
handler.handleException(new TransportException(ex));
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}

sendRequest(connection, action, request, options, handler);
}

private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ public static MockTcpTransport newMockTransport(Settings settings, Version versi

public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR);
}

public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders,
TransportInterceptor interceptor) {
return new MockTransportService(settings, transport, threadPool, interceptor,
boundAddress ->
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Expand All @@ -111,7 +114,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
protected volatile DiscoveryNode nodeB;
protected volatile MockTransportService serviceB;

protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);
private MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
return build(settings, version, clusterSettings, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
}

protected abstract MockTransportService build(
Settings settings,
Version version,
ClusterSettings clusterSettings,
boolean doHandshake,
TransportInterceptor interceptor);

protected int channelsPerNodeConnection() {
// This is a customized profile for this test case.
Expand Down Expand Up @@ -165,6 +177,12 @@ public void onNodeDisconnected(DiscoveryNode node) {

private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests, boolean doHandshake) {
return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
}

private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests, boolean doHandshake,
TransportInterceptor interceptor) {
MockTransportService service = build(
Settings.builder()
.put(settings)
Expand All @@ -173,7 +191,7 @@ private MockTransportService buildService(final String name, final Version versi
.put(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version,
clusterSettings, doHandshake);
clusterSettings, doHandshake, interceptor);
if (acceptRequests) {
service.acceptIncomingRequests();
}
Expand All @@ -186,7 +204,7 @@ protected MockTransportService buildService(final String name, final Version ver

protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
Settings settings) {
return buildService(name, version, clusterSettings, settings, true, true);
return buildService(name, version, clusterSettings, settings, true, true, NOOP_TRANSPORT_INTERCEPTOR);
}

@Override
Expand Down Expand Up @@ -2731,6 +2749,80 @@ public void onConnectionClosed(Transport.Connection connection) {
}
}

// test that the response handler is invoked on a failure to send
public void testFailToSend() throws InterruptedException {
final RuntimeException failToSendException;
if (randomBoolean()) {
failToSendException = new IllegalStateException("fail to send");
} else {
failToSendException = new TransportException("fail to send");
}
final TransportInterceptor interceptor = new TransportInterceptor() {
@Override
public AsyncSender interceptSender(final AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
if ("fail-to-send-action".equals(action)) {
throw failToSendException;
} else {
sender.sendRequest(connection, action, request, options, handler);
}
}
};
}
};
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
serviceC.start();
serviceC.acceptIncomingRequests();
serviceC.connectToNode(serviceA.getLocalDiscoNode(), ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY));
final AtomicReference<TransportException> te = new AtomicReference<>();
final Transport.Connection connection = serviceC.getConnection(nodeA);
serviceC.sendRequest(
connection,
"fail-to-send-action",
TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY,
new TransportResponseHandler<TransportResponse>() {
@Override
public void handleResponse(final TransportResponse response) {
fail("handle response should not be invoked");
}

@Override
public void handleException(final TransportException exp) {
te.set(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TransportResponse read(final StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
});
assertThat(te.get(), not(nullValue()));

if (failToSendException instanceof IllegalStateException) {
assertThat(te.get().getMessage(), equalTo("failure to send"));
assertThat(te.get().getCause(), instanceOf(IllegalStateException.class));
assertThat(te.get().getCause().getMessage(), equalTo("fail to send"));
} else {
assertThat(te.get().getMessage(), equalTo("fail to send"));
assertThat(te.get().getCause(), nullValue());
}
}

}

private void closeConnectionChannel(Transport.Connection connection) {
StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection;
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {

@Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake,
TransportInterceptor interceptor) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) {
Expand All @@ -49,8 +50,14 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP
}
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());
MockTransportService mockTransportService = MockTransportService.createNewService(
settings,
transport,
version,
threadPool,
clusterSettings,
Collections.emptySet(),
interceptor);
mockTransportService.start();
return mockTransportService;
}
Expand Down
Loading

0 comments on commit 4e3d7ee

Please sign in to comment.