diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java index b474cb67772a5..82b85329d9dbd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java @@ -991,20 +991,16 @@ private Scheduler.Cancellable sendTransportRequest logger.trace("Opened connection to {}, making transport request", masterEligibleNode); // If we don't get a response in 10 seconds that is a failure worth capturing on its own: final TimeValue transportTimeout = TimeValue.timeValueSeconds(10); - try { - transportService.sendRequest( - masterEligibleNode, - transportActionType.name(), - transportActionRequest, - TransportRequestOptions.timeout(transportTimeout), - new ActionListenerResponseHandler<>( - ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)), - transportActionType.getResponseReader() - ) - ); - } catch (Exception e) { - responseConsumer.accept(responseTransformationFunction.apply(null, e)); - } + transportService.sendRequest( + masterEligibleNode, + transportActionType.name(), + transportActionRequest, + TransportRequestOptions.timeout(transportTimeout), + new ActionListenerResponseHandler<>( + ActionListener.runBefore(fetchRemoteResultListener, () -> Releasables.close(releasable)), + transportActionType.getResponseReader() + ) + ); } }, e -> { logger.warn("Exception connecting to master masterEligibleNode", e); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 048b2e6665102..67dddfa6c247b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -423,24 +423,18 @@ private void sendClusterState( listener.onFailure(new IllegalStateException("serialized cluster state released before transmission")); return; } - try { - transportService.sendChildRequest( - destination, - PUBLISH_STATE_ACTION_NAME, - new BytesTransportRequest(bytes, destination.getVersion()), - task, - STATE_REQUEST_OPTIONS, - new ActionListenerResponseHandler<>( - ActionListener.runAfter(listener, bytes::decRef), - PublishWithJoinResponse::new, - ThreadPool.Names.CLUSTER_COORDINATION - ) - ); - } catch (Exception e) { - assert false : e; - logger.warn(() -> format("error sending cluster state to %s", destination), e); - listener.onFailure(e); - } + transportService.sendChildRequest( + destination, + PUBLISH_STATE_ACTION_NAME, + new BytesTransportRequest(bytes, destination.getVersion()), + task, + STATE_REQUEST_OPTIONS, + new ActionListenerResponseHandler<>( + ActionListener.runAfter(listener, bytes::decRef), + PublishWithJoinResponse::new, + ThreadPool.Names.CLUSTER_COORDINATION + ) + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 3a30cbb433506..1b7fe6b615073 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -716,9 +716,15 @@ public final void sendRequest( final Transport.Connection connection; try { connection = getConnection(node); - } catch (final NodeNotConnectedException ex) { - // the caller might not handle this so we invoke the handler - handler.handleException(ex); + } catch (TransportException transportException) { + // should only be a NodeNotConnectedException in practice, but handle all cases anyway to be sure + assert transportException instanceof NodeNotConnectedException : transportException; + handleSendRequestException(handler, transportException); + return; + } catch (Exception exception) { + // shouldn't happen in practice, but handle it anyway to be sure + assert false : exception; + handleSendRequestException(handler, new SendRequestTransportException(node, action, exception)); return; } sendRequest(connection, action, request, options, handler); @@ -776,25 +782,25 @@ public final void sendRequest( delegate = handler; } asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - handleSendRequestException(connection, action, handler, ex); + } catch (TransportException transportException) { + handleSendRequestException(handler, transportException); + } catch (Exception exception) { + handleSendRequestException(handler, new SendRequestTransportException(connection.getNode(), action, exception)); } } - private void handleSendRequestException( - Transport.Connection connection, - String action, + private static void handleSendRequestException( TransportResponseHandler handler, - Exception ex + TransportException transportException ) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException tex) { - te = tex; - } else { - te = new SendRequestTransportException(connection.getNode(), action, ex); + try { + handler.handleException(transportException); + } catch (Exception innerException) { + // should not happen + innerException.addSuppressed(transportException); + logger.error("unexpected exception from handler.handleException", innerException); + assert false : innerException; } - handler.handleException(te); } /** @@ -820,9 +826,15 @@ public final void sendChildRequest( final Transport.Connection connection; try { connection = getConnection(node); - } catch (final NodeNotConnectedException ex) { - // the caller might not handle this so we invoke the handler - handler.handleException(ex); + } catch (TransportException transportException) { + // should only be a NodeNotConnectedException in practice, but handle all cases anyway to be sure + assert transportException instanceof NodeNotConnectedException : transportException; + handleSendRequestException(handler, transportException); + return; + } catch (Exception exception) { + // shouldn't happen in practice, but handle it anyway to be sure + assert false : exception; + handleSendRequestException(handler, new SendRequestTransportException(node, action, exception)); return; } sendChildRequest(connection, action, request, parentTask, options, handler); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index b823a2b3f8b7c..a0c826dacbbee 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -3176,11 +3176,7 @@ public static Future submitRequest( responseListener.whenComplete(handler::handleResponse, e -> handler.handleException((TransportException) e)); final PlainActionFuture future = PlainActionFuture.newFuture(); responseListener.addListener(future); - try { - transportService.sendRequest(node, action, request, options, futureHandler); - } catch (NodeNotConnectedException ex) { - futureHandler.handleException(ex); - } + transportService.sendRequest(node, action, request, options, futureHandler); return future; } }