Skip to content

Commit

Permalink
Clarify that TransportService#sendRequest never throws (#89298)
Browse files Browse the repository at this point in the history
It's not obvious from reading the code that
`TransportService#sendRequest` and friends always catch exceptions and
pass them to the response handler, which means some callers are wrapping
calls to `sendRequest` in their own unnecessary try/catch blocks. This
commit makes it clear that all exceptions are handled and removes the
unnecessary exception handling in callers.

Closes #89274
  • Loading branch information
DaveCTurner authored Aug 12, 2022
1 parent 654f31d commit ed940b6
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -991,20 +991,16 @@ private <R extends ActionResponse, T> Scheduler.Cancellable sendTransportRequest
logger.trace("Opened connection to {}, making transport request", masterEligibleNode);
// If we don't get a response in 10 seconds that is a failure worth capturing on its own:
final TimeValue transportTimeout = TimeValue.timeValueSeconds(10);
try {
transportService.sendRequest(
masterEligibleNode,
transportActionType.name(),
transportActionRequest,
TransportRequestOptions.timeout(transportTimeout),
new ActionListenerResponseHandler<>(
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
transportActionType.getResponseReader()
)
);
} catch (Exception e) {
responseConsumer.accept(responseTransformationFunction.apply(null, e));
}
transportService.sendRequest(
masterEligibleNode,
transportActionType.name(),
transportActionRequest,
TransportRequestOptions.timeout(transportTimeout),
new ActionListenerResponseHandler<>(
ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)),
transportActionType.getResponseReader()
)
);
}
}, e -> {
logger.warn("Exception connecting to master masterEligibleNode", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,24 +423,18 @@ private void sendClusterState(
listener.onFailure(new IllegalStateException("serialized cluster state released before transmission"));
return;
}
try {
transportService.sendChildRequest(
destination,
PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, destination.getVersion()),
task,
STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, bytes::decRef),
PublishWithJoinResponse::new,
ThreadPool.Names.CLUSTER_COORDINATION
)
);
} catch (Exception e) {
assert false : e;
logger.warn(() -> format("error sending cluster state to %s", destination), e);
listener.onFailure(e);
}
transportService.sendChildRequest(
destination,
PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, destination.getVersion()),
task,
STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, bytes::decRef),
PublishWithJoinResponse::new,
ThreadPool.Names.CLUSTER_COORDINATION
)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,15 @@ public final <T extends TransportResponse> void sendRequest(
final Transport.Connection connection;
try {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
} catch (TransportException transportException) {
// should only be a NodeNotConnectedException in practice, but handle all cases anyway to be sure
assert transportException instanceof NodeNotConnectedException : transportException;
handleSendRequestException(handler, transportException);
return;
} catch (Exception exception) {
// shouldn't happen in practice, but handle it anyway to be sure
assert false : exception;
handleSendRequestException(handler, new SendRequestTransportException(node, action, exception));
return;
}
sendRequest(connection, action, request, options, handler);
Expand Down Expand Up @@ -776,25 +782,25 @@ public final <T extends TransportResponse> void sendRequest(
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
handleSendRequestException(connection, action, handler, ex);
} catch (TransportException transportException) {
handleSendRequestException(handler, transportException);
} catch (Exception exception) {
handleSendRequestException(handler, new SendRequestTransportException(connection.getNode(), action, exception));
}
}

private <T extends TransportResponse> void handleSendRequestException(
Transport.Connection connection,
String action,
private static <T extends TransportResponse> void handleSendRequestException(
TransportResponseHandler<T> handler,
Exception ex
TransportException transportException
) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException tex) {
te = tex;
} else {
te = new SendRequestTransportException(connection.getNode(), action, ex);
try {
handler.handleException(transportException);
} catch (Exception innerException) {
// should not happen
innerException.addSuppressed(transportException);
logger.error("unexpected exception from handler.handleException", innerException);
assert false : innerException;
}
handler.handleException(te);
}

/**
Expand All @@ -820,9 +826,15 @@ public final <T extends TransportResponse> void sendChildRequest(
final Transport.Connection connection;
try {
connection = getConnection(node);
} catch (final NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
} catch (TransportException transportException) {
// should only be a NodeNotConnectedException in practice, but handle all cases anyway to be sure
assert transportException instanceof NodeNotConnectedException : transportException;
handleSendRequestException(handler, transportException);
return;
} catch (Exception exception) {
// shouldn't happen in practice, but handle it anyway to be sure
assert false : exception;
handleSendRequestException(handler, new SendRequestTransportException(node, action, exception));
return;
}
sendChildRequest(connection, action, request, parentTask, options, handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3176,11 +3176,7 @@ public static <T extends TransportResponse> Future<T> submitRequest(
responseListener.whenComplete(handler::handleResponse, e -> handler.handleException((TransportException) e));
final PlainActionFuture<T> future = PlainActionFuture.newFuture();
responseListener.addListener(future);
try {
transportService.sendRequest(node, action, request, options, futureHandler);
} catch (NodeNotConnectedException ex) {
futureHandler.handleException(ex);
}
transportService.sendRequest(node, action, request, options, futureHandler);
return future;
}
}

0 comments on commit ed940b6

Please sign in to comment.