From 41e3b4aa905b053c80d52618ca23fa5919ae818e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 16 Mar 2020 21:27:02 -0400 Subject: [PATCH] Invoke response handler on failure to send (#53631) Today it can happen that a transport message fails to send (for example, because a transport interceptor rejects the request). In this case, the response handler is never invoked, which can lead to necessary cleanups not being performed. There are two ways to handle this. One is to expect every callsite that sends a message to try/catch these exceptions and handle them appropriately. The other is merely to invoke the response handler to handle the exception, which is already equipped to handle transport exceptions. --- .../transport/TransportService.java | 56 ++++++---- .../test/transport/MockTransportService.java | 8 +- .../AbstractSimpleTransportTestCase.java | 102 +++++++++++++++++- 3 files changed, 141 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 0f9b25cc9c617..dcf9b3595c80e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -46,7 +46,6 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -519,37 +518,57 @@ public void removeConnectionListener(TransportConnectionListener listener) { public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); } public final void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendRequest(connection, action, request, options, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendRequest(connection, action, request, options, handler); } + /** + * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked. + * + * @param connection the connection to send the request on + * @param action the name of the action + * @param request the request + * @param options the options for this request + * @param handler the response handler + * @param the type of the transport response + */ public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { try { asyncSender.sendRequest(connection, action, request, options, handler); - } catch (NodeNotConnectedException ex) { + } catch (final Exception ex) { // the caller might not handle this so we invoke the handler - handler.handleException(ex); + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + handler.handleException(te); } } @@ -569,13 +588,15 @@ public final void sendChildRequest(final Discovery final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendChildRequest(connection, action, request, parentTask, options, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendChildRequest(connection, action, request, parentTask, options, handler); } public void sendChildRequest(final Transport.Connection connection, final String action, @@ -589,16 +610,7 @@ public void sendChildRequest(final Transport.Conne final TransportRequestOptions options, final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); - try { - sendRequest(connection, action, request, options, handler); - } catch (TaskCancelledException ex) { - // The parent task is already cancelled - just fail the request - handler.handleException(new TransportException(ex)); - } catch (NodeNotConnectedException ex) { - // the caller might not handle this so we invoke the handler - handler.handleException(ex); - } - + sendRequest(connection, action, request, options, handler); } private void sendRequestInternal(final Transport.Connection connection, final String action, diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index f90727c936e00..0867bb4abec1d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -120,7 +120,13 @@ public static MockNioTransport newMockTransport(Settings settings, Version versi public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR); + } + + public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, + @Nullable ClusterSettings clusterSettings, Set taskHeaders, + TransportInterceptor interceptor) { + return new MockTransportService(settings, transport, threadPool, interceptor, boundAddress -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index c0fd76d7f753b..0a89b2811b1a6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -94,12 +94,15 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @@ -187,7 +190,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti } private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings, - Settings settings, boolean acceptRequests, boolean doHandshake) { + Settings settings, boolean acceptRequests, boolean doHandshake, + TransportInterceptor interceptor) { Settings updatedSettings = Settings.builder() .put(TransportSettings.PORT.getKey(), getPortRange()) .put(settings) @@ -198,7 +202,7 @@ private MockTransportService buildService(final String name, final Version versi } Transport transport = build(updatedSettings, version, clusterSettings, doHandshake); MockTransportService service = MockTransportService.createNewService(updatedSettings, transport, version, threadPool, - clusterSettings, Collections.emptySet()); + clusterSettings, Collections.emptySet(), interceptor); service.start(); if (acceptRequests) { service.acceptIncomingRequests(); @@ -206,6 +210,11 @@ private MockTransportService buildService(final String name, final Version versi return service; } + private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings, + Settings settings, boolean acceptRequests, boolean doHandshake) { + return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR); + } + protected MockTransportService buildService(final String name, final Version version, Settings settings) { return buildService(name, version, null, settings); } @@ -2744,6 +2753,95 @@ public void onConnectionClosed(Transport.Connection connection) { } } + // test that the response handler is invoked on a failure to send + public void testFailToSend() throws InterruptedException { + final RuntimeException failToSendException; + if (randomBoolean()) { + failToSendException = new IllegalStateException("fail to send"); + } else { + failToSendException = new TransportException("fail to send"); + } + final TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public AsyncSender interceptSender(final AsyncSender sender) { + return new AsyncSender() { + @Override + public void sendRequest( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler) { + if ("fail-to-send-action".equals(action)) { + throw failToSendException; + } else { + sender.sendRequest(connection, action, request, options, handler); + } + } + }; + } + }; + try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) { + serviceC.start(); + serviceC.acceptIncomingRequests(); + final CountDownLatch latch = new CountDownLatch(1); + serviceC.connectToNode( + serviceA.getLocalDiscoNode(), + ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY), + new ActionListener<>() { + @Override + public void onResponse(final Void v) { + latch.countDown(); + } + + @Override + public void onFailure(final Exception e) { + fail(e.getMessage()); + } + }); + latch.await(); + final AtomicReference te = new AtomicReference<>(); + final Transport.Connection connection = serviceC.getConnection(nodeA); + serviceC.sendRequest( + connection, + "fail-to-send-action", + TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + @Override + public void handleResponse(final TransportResponse response) { + fail("handle response should not be invoked"); + } + + @Override + public void handleException(final TransportException exp) { + te.set(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse read(final StreamInput in) { + return TransportResponse.Empty.INSTANCE; + } + }); + assertThat(te.get(), not(nullValue())); + + if (failToSendException instanceof IllegalStateException) { + assertThat(te.get().getMessage(), equalTo("failure to send")); + assertThat(te.get().getCause(), instanceOf(IllegalStateException.class)); + assertThat(te.get().getCause().getMessage(), equalTo("fail to send")); + } else { + assertThat(te.get().getMessage(), equalTo("fail to send")); + assertThat(te.get().getCause(), nullValue()); + } + } + + } + private void closeConnectionChannel(Transport.Connection connection) { StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection; TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();