Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoke response handler on failure to send #53631

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -519,37 +518,57 @@ public void removeConnectionListener(TransportConnectionListener listener) {
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
}

public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendRequest(connection, action, request, options, handler);
}

/**
* Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
*
* @param connection the connection to send the request on
* @param action the name of the action
* @param request the request
* @param options the options for this request
* @param handler the response handler
* @param <T> the type of the transport response
*/
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
try {
asyncSender.sendRequest(connection, action, request, options, handler);
} catch (NodeNotConnectedException ex) {
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
handler.handleException(te);
}
}

Expand All @@ -569,13 +588,15 @@ public final <T extends TransportResponse> void sendChildRequest(final Discovery
final TransportRequest request, final Task parentTask,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
final Transport.Connection connection;
try {
Transport.Connection connection = getConnection(node);
sendChildRequest(connection, action, request, parentTask, options, handler);
} catch (NodeNotConnectedException ex) {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
return;
}
sendChildRequest(connection, action, request, parentTask, options, handler);
}

public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
Expand All @@ -589,16 +610,7 @@ public <T extends TransportResponse> void sendChildRequest(final Transport.Conne
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
request.setParentTask(localNode.getId(), parentTask.getId());
try {
sendRequest(connection, action, request, options, handler);
} catch (TaskCancelledException ex) {
// The parent task is already cancelled - just fail the request
handler.handleException(new TransportException(ex));
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}

sendRequest(connection, action, request, options, handler);
}

private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ public static MockNioTransport newMockTransport(Settings settings, Version versi

public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
return createNewService(settings, transport, version, threadPool, clusterSettings, taskHeaders, NOOP_TRANSPORT_INTERCEPTOR);
}

public static MockTransportService createNewService(Settings settings, Transport transport, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders,
TransportInterceptor interceptor) {
return new MockTransportService(settings, transport, threadPool, interceptor,
boundAddress ->
new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), UUIDs.randomBase64UUID(), boundAddress.publishAddress(),
Node.NODE_ATTRIBUTES.getAsMap(settings), DiscoveryNode.getRolesFromSettings(settings), version),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Expand Down Expand Up @@ -187,7 +190,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
}

private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests, boolean doHandshake) {
Settings settings, boolean acceptRequests, boolean doHandshake,
TransportInterceptor interceptor) {
Settings updatedSettings = Settings.builder()
.put(TransportSettings.PORT.getKey(), getPortRange())
.put(settings)
Expand All @@ -198,14 +202,19 @@ private MockTransportService buildService(final String name, final Version versi
}
Transport transport = build(updatedSettings, version, clusterSettings, doHandshake);
MockTransportService service = MockTransportService.createNewService(updatedSettings, transport, version, threadPool,
clusterSettings, Collections.emptySet());
clusterSettings, Collections.emptySet(), interceptor);
service.start();
if (acceptRequests) {
service.acceptIncomingRequests();
}
return service;
}

private MockTransportService buildService(final String name, final Version version, @Nullable ClusterSettings clusterSettings,
Settings settings, boolean acceptRequests, boolean doHandshake) {
return buildService(name, version, clusterSettings, settings, acceptRequests, doHandshake, NOOP_TRANSPORT_INTERCEPTOR);
}

protected MockTransportService buildService(final String name, final Version version, Settings settings) {
return buildService(name, version, null, settings);
}
Expand Down Expand Up @@ -2744,6 +2753,95 @@ public void onConnectionClosed(Transport.Connection connection) {
}
}

// test that the response handler is invoked on a failure to send
public void testFailToSend() throws InterruptedException {
final RuntimeException failToSendException;
if (randomBoolean()) {
failToSendException = new IllegalStateException("fail to send");
} else {
failToSendException = new TransportException("fail to send");
}
final TransportInterceptor interceptor = new TransportInterceptor() {
@Override
public AsyncSender interceptSender(final AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
final Transport.Connection connection,
final String action,
final TransportRequest request,
final TransportRequestOptions options,
final TransportResponseHandler<T> handler) {
if ("fail-to-send-action".equals(action)) {
throw failToSendException;
} else {
sender.sendRequest(connection, action, request, options, handler);
}
}
};
}
};
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
serviceC.start();
serviceC.acceptIncomingRequests();
final CountDownLatch latch = new CountDownLatch(1);
serviceC.connectToNode(
serviceA.getLocalDiscoNode(),
ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY),
new ActionListener<>() {
@Override
public void onResponse(final Void v) {
latch.countDown();
}

@Override
public void onFailure(final Exception e) {
fail(e.getMessage());
}
});
latch.await();
final AtomicReference<TransportException> te = new AtomicReference<>();
final Transport.Connection connection = serviceC.getConnection(nodeA);
serviceC.sendRequest(
connection,
"fail-to-send-action",
TransportRequest.Empty.INSTANCE,
TransportRequestOptions.EMPTY,
new TransportResponseHandler<TransportResponse>() {
@Override
public void handleResponse(final TransportResponse response) {
fail("handle response should not be invoked");
}

@Override
public void handleException(final TransportException exp) {
te.set(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TransportResponse read(final StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}
});
assertThat(te.get(), not(nullValue()));

if (failToSendException instanceof IllegalStateException) {
assertThat(te.get().getMessage(), equalTo("failure to send"));
assertThat(te.get().getCause(), instanceOf(IllegalStateException.class));
assertThat(te.get().getCause().getMessage(), equalTo("fail to send"));
} else {
assertThat(te.get().getMessage(), equalTo("fail to send"));
assertThat(te.get().getCause(), nullValue());
}
}

}

private void closeConnectionChannel(Transport.Connection connection) {
StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection;
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection();
Expand Down