From 2c88a89c50da01c060c1b5e4faec4ee7a2cf098f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 14 Sep 2023 17:35:06 +0100 Subject: [PATCH 01/41] Drop unnecessary exception mangling (#99560) Today the ESQL module uses `ListenableActionFuture` throughout, but this is just the same as `SubscribableListener` except for the way it mangles exceptions. The exception-mangling behaviour is unnecessary, so this commit removes it. --- .../action/support/SubscribableListener.java | 40 ++++++++++++++++++- .../support/SubscribableListenerTests.java | 17 ++++++++ .../compute/operator/AsyncOperator.java | 10 ++--- .../compute/operator/Driver.java | 20 +++++----- .../compute/operator/Operator.java | 12 ++---- .../operator/exchange/ExchangeBuffer.java | 20 +++++----- .../operator/exchange/ExchangeSink.java | 4 +- .../exchange/ExchangeSinkHandler.java | 9 ++--- .../exchange/ExchangeSinkOperator.java | 4 +- .../operator/exchange/ExchangeSource.java | 4 +- .../exchange/ExchangeSourceHandler.java | 8 ++-- .../exchange/ExchangeSourceOperator.java | 6 +-- .../compute/operator/AsyncOperatorTests.java | 4 +- .../exchange/ExchangeServiceTests.java | 4 +- .../xpack/esql/plugin/ComputeService.java | 4 +- 15 files changed, 106 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index 96b54a951ccc9..5ba43111b4f03 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -29,7 +30,7 @@ /** * An {@link ActionListener} to which other {@link ActionListener} instances can subscribe, such that when this listener is completed it * fans-out its result to the subscribed listeners. - * + *

* Similar to {@link ListenableActionFuture} and {@link ListenableFuture} except for its handling of exceptions: if this listener is * completed exceptionally then the exception is passed to subscribed listeners without modification. */ @@ -38,6 +39,41 @@ public class SubscribableListener implements ActionListener { private static final Logger logger = LogManager.getLogger(SubscribableListener.class); private static final Object EMPTY = new Object(); + /** + * Create a {@link SubscribableListener} which is incomplete. + */ + public SubscribableListener() { + this(EMPTY); + } + + /** + * Create a {@link SubscribableListener} which has already succeeded with the given result. + */ + public static SubscribableListener newSucceeded(T result) { + return new SubscribableListener<>(new SuccessResult<>(result)); + } + + /** + * Create a {@link SubscribableListener} which has already failed with the given exception. + */ + public static SubscribableListener newFailed(Exception exception) { + return new SubscribableListener<>(new FailureResult(exception, exception)); + } + + /** + * Create a {@link SubscribableListener}, fork a computation to complete it, and return the listener. If the forking itself throws an + * exception then the exception is caught and fed to the returned listener. + */ + public static SubscribableListener newForked(CheckedConsumer, ? extends Exception> fork) { + final var listener = new SubscribableListener(); + ActionListener.run(listener, fork::accept); + return listener; + } + + private SubscribableListener(Object initialState) { + state = initialState; + } + /** * If we are incomplete, {@code state} may be one of the following depending on how many waiting subscribers there are: *