From 4e3d7eeca261b35f20500a1f1404859fc19117bf Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 16 Mar 2020 21:27:02 -0400 Subject: [PATCH] Invoke response handler on failure to send (#53631) Today it can happen that a transport message fails to send (for example, because a transport interceptor rejects the request). In this case, the response handler is never invoked, which can lead to necessary cleanups not being performed. There are two ways to handle this. One is to expect every callsite that sends a message to try/catch these exceptions and handle them appropriately. The other is merely to invoke the response handler to handle the exception, which is already equipped to handle transport exceptions. --- .../netty4/SimpleNetty4TransportTests.java | 27 +++-- .../transport/TransportService.java | 56 ++++++----- .../test/transport/MockTransportService.java | 8 +- .../AbstractSimpleTransportTestCase.java | 98 ++++++++++++++++++- .../transport/MockTcpTransportTests.java | 13 ++- ...stractSimpleSecurityTransportTestCase.java | 3 +- 6 files changed, 169 insertions(+), 36 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 10c91b4e8d7da..c03f6a2208ff9 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportSettings; import java.net.InetAddress; @@ -46,12 +47,14 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase { public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, - ClusterSettings clusterSettings, boolean doHandshake) { + ClusterSettings clusterSettings, boolean doHandshake, + TransportInterceptor interceptor) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { @@ -66,16 +69,28 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP } } }; - MockTransportService mockTransportService = - MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService mockTransportService = MockTransportService.createNewService( + settings, + transport, + version, + threadPool, + clusterSettings, + Collections.emptySet(), + interceptor); mockTransportService.start(); return mockTransportService; } @Override - protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { + protected MockTransportService build( + Settings settings, + Version version, + ClusterSettings clusterSettings, + boolean doHandshake, + TransportInterceptor interceptor) { settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build(); - MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake); + MockTransportService transportService = + nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake, interceptor); transportService.start(); return transportService; } @@ -103,7 +118,7 @@ public void testBindUnavailableAddress() { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> { MockTransportService transportService = - nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true); + nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true, NOOP_TRANSPORT_INTERCEPTOR); try { transportService.start(); } finally { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 0fe7f4ce933ab..1a888c99b1814 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -48,7 +48,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -512,37 +511,57 @@ public TransportFuture submitRequest(DiscoveryN public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); } public final void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendRequest(connection, action, request, options, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendRequest(connection, action, request, options, handler); } + /** + * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked. + * + * @param connection the connection to send the request on + * @param action the name of the action + * @param request the request + * @param options the options for this request + * @param handler the response handler + * @param the type of the transport response + */ public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { try { asyncSender.sendRequest(connection, action, request, options, handler); - } catch (NodeNotConnectedException ex) { + } catch (final Exception ex) { // the caller might not handle this so we invoke the handler - handler.handleException(ex); + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + handler.handleException(te); } } @@ -562,13 +581,15 @@ public final void sendChildRequest(final Discovery final TransportRequest request, final Task parentTask, final TransportRequestOptions options, final TransportResponseHandler handler) { + final Transport.Connection connection; try { - Transport.Connection connection = getConnection(node); - sendChildRequest(connection, action, request, parentTask, options, handler); - } catch (NodeNotConnectedException ex) { + connection = getConnection(node); + } catch (final NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); + return; } + sendChildRequest(connection, action, request, parentTask, options, handler); } public void sendChildRequest(final Transport.Connection connection, final String action, @@ -582,16 +603,7 @@ public void sendChildRequest(final Transport.Conne final TransportRequestOptions options, final TransportResponseHandler handler) { request.setParentTask(localNode.getId(), parentTask.getId()); - try { - sendRequest(connection, action, request, options, handler); - } catch (TaskCancelledException ex) { - // The parent task is already cancelled - just fail the request - handler.handleException(new TransportException(ex)); - } catch (NodeNotConnectedException ex) { - // the caller might not handle this so we invoke the handler - handler.handleException(ex); - } - + sendRequest(connection, action, request, options, handler); } private void sendRequestInternal(final Transport.Connection connection, final String action, diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index e5fbee8c59212..49d48c3d352da 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -117,7 +117,13 @@ public static MockTcpTransport newMockTransport(Settings settings, Version versi public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, @Nullable ClusterSettings clusterSettings, Set taskHeaders) { - return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR); + } + + public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool, + @Nullable ClusterSettings clusterSettings, Set taskHeaders, + TransportInterceptor interceptor) { + return new MockTransportService(settings, transport, threadPool, interceptor, boundAddress -> new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(), Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version), diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index f19f7e090ae97..3828a0fda5788 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -87,12 +87,15 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @@ -111,7 +114,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected volatile DiscoveryNode nodeB; protected volatile MockTransportService serviceB; - protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake); + private MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { + return build(settings, version, clusterSettings, doHandshake, NOOP_TRANSPORT_INTERCEPTOR); + } + + protected abstract MockTransportService build( + Settings settings, + Version version, + ClusterSettings clusterSettings, + boolean doHandshake, + TransportInterceptor interceptor); protected int channelsPerNodeConnection() { // This is a customized profile for this test case. @@ -165,6 +177,12 @@ public void onNodeDisconnected(DiscoveryNode node) { private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, Settings settings, boolean acceptRequests, boolean doHandshake) { + return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR); + } + + private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, + Settings settings, boolean acceptRequests, boolean doHandshake, + TransportInterceptor interceptor) { MockTransportService service = build( Settings.builder() .put(settings) @@ -173,7 +191,7 @@ private MockTransportService buildService(final String name, final Version versi .put(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .build(), version, - clusterSettings, doHandshake); + clusterSettings, doHandshake, interceptor); if (acceptRequests) { service.acceptIncomingRequests(); } @@ -186,7 +204,7 @@ protected MockTransportService buildService(final String name, final Version ver protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, Settings settings) { - return buildService(name, version, clusterSettings, settings, true, true); + return buildService(name, version, clusterSettings, settings, true, true, NOOP_TRANSPORT_INTERCEPTOR); } @Override @@ -2731,6 +2749,80 @@ public void onConnectionClosed(Transport.Connection connection) { } } + // test that the response handler is invoked on a failure to send + public void testFailToSend() throws InterruptedException { + final RuntimeException failToSendException; + if (randomBoolean()) { + failToSendException = new IllegalStateException("fail to send"); + } else { + failToSendException = new TransportException("fail to send"); + } + final TransportInterceptor interceptor = new TransportInterceptor() { + @Override + public AsyncSender interceptSender(final AsyncSender sender) { + return new AsyncSender() { + @Override + public void sendRequest( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler) { + if ("fail-to-send-action".equals(action)) { + throw failToSendException; + } else { + sender.sendRequest(connection, action, request, options, handler); + } + } + }; + } + }; + try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) { + serviceC.start(); + serviceC.acceptIncomingRequests(); + serviceC.connectToNode(serviceA.getLocalDiscoNode(), ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY)); + final AtomicReference te = new AtomicReference<>(); + final Transport.Connection connection = serviceC.getConnection(nodeA); + serviceC.sendRequest( + connection, + "fail-to-send-action", + TransportRequest.Empty.INSTANCE, + TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + @Override + public void handleResponse(final TransportResponse response) { + fail("handle response should not be invoked"); + } + + @Override + public void handleException(final TransportException exp) { + te.set(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public TransportResponse read(final StreamInput in) { + return TransportResponse.Empty.INSTANCE; + } + }); + assertThat(te.get(), not(nullValue())); + + if (failToSendException instanceof IllegalStateException) { + assertThat(te.get().getMessage(), equalTo("failure to send")); + assertThat(te.get().getCause(), instanceOf(IllegalStateException.class)); + assertThat(te.get().getCause().getMessage(), equalTo("fail to send")); + } else { + assertThat(te.get().getMessage(), equalTo("fail to send")); + assertThat(te.get().getCause(), nullValue()); + } + } + + } + private void closeConnectionChannel(Transport.Connection connection) { StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection; TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 5c7fdc10649b5..a585c01b59dbe 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -34,7 +34,8 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { @Override - protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { + protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake, + TransportInterceptor interceptor) { NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) { @@ -49,8 +50,14 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP } } }; - MockTransportService mockTransportService = - MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService mockTransportService = MockTransportService.createNewService( + settings, + transport, + version, + threadPool, + clusterSettings, + Collections.emptySet(), + interceptor); mockTransportService.start(); return mockTransportService; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java index f07cc3d58e915..b4285e324c6ff 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java @@ -42,6 +42,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -103,7 +104,7 @@ public void testBindUnavailableAddress() { .build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> { - MockTransportService transportService = build(settings, Version.CURRENT, clusterSettings, true); + MockTransportService transportService = build(settings, Version.CURRENT, clusterSettings, true, NOOP_TRANSPORT_INTERCEPTOR); try { transportService.start(); } finally {