From 0c6bf0fb7a930dae2726f0b840c8f975558ee750 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 18 Dec 2019 20:30:01 +0100 Subject: [PATCH 1/3] Improve ListenableFuture exception handling StepListener builds upon ListenableFuture, which used to use FutureUtils.get to be informed about the exception passed to onFailure. This had the bad consequence of masking away any exception that was an ElasticsearchWrapperException like RemoteTransportException. Specifically for recovery, this made CircuitBreakerExceptions happening on the target node look like they originated from the source node. --- .../elasticsearch/action/StepListener.java | 19 +++++++++++++++- .../util/concurrent/ListenableFuture.java | 22 ++++++++++++++++++- .../action/StepListenerTests.java | 15 +++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index 43338d94de86d..cc324d931f3a9 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -19,12 +19,16 @@ package org.elasticsearch.action; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; /** @@ -85,6 +89,19 @@ public Response result() { if (delegate.isDone() == false) { throw new IllegalStateException("step is not completed yet"); } - return FutureUtils.get(delegate, 0L, TimeUnit.NANOSECONDS); // this future is done already - use a non-blocking method. + try { + // this future is done already - use a non-blocking method. + return ((Future) delegate).get(0L, TimeUnit.NANOSECONDS); + } catch (TimeoutException e) { + throw new ElasticsearchTimeoutException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw FutureUtils.rethrowExecutionException(e); + } } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index 67088eac91f8d..d9d0a126513be 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -26,8 +27,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * A future implementation that allows for the result to be passed to listeners waiting for @@ -108,7 +112,23 @@ private void notifyListener(ActionListener listener, ExecutorService executor protected void doRun() { // call get in a non-blocking fashion as we could be on a network thread // or another thread like the scheduler, which we should never block! - V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS); + V result; + try { + result = ((Future) ListenableFuture.this).get(0L, TimeUnit.NANOSECONDS); + } catch (TimeoutException e) { + throw new ElasticsearchTimeoutException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + // FutureUtils.rethrowExecutionException unwraps cause, which means the exception reported is not the same as + // the one given to onFailure. This masks away important debug info like RemoteTransportException. + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw FutureUtils.rethrowExecutionException(e); + } + V value = result; listener.onResponse(value); } diff --git a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java index 15e88830e47e9..d22f77d098ce6 100644 --- a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java @@ -22,11 +22,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import org.junit.After; import org.junit.Before; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; @@ -110,4 +112,17 @@ private void executeAction(Runnable runnable) { runnable.run(); } } + + public void testUnwrap() { + StepListener step = new StepListener<>(); + step.onFailure(new RemoteTransportException("test", new RuntimeException("expected"))); + AtomicReference exception = new AtomicReference<>(); + step.whenComplete(null, e -> { + exception.set((RuntimeException) e); + }); + + assertEquals(RemoteTransportException.class, exception.get().getClass()); + RuntimeException e = expectThrows(RuntimeException.class, () -> step.result()); + assertEquals(RemoteTransportException.class, e.getClass()); + } } From 3832c3b343c1566d5eedab04118346da4fe18a2e Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 19 Dec 2019 18:41:43 +0100 Subject: [PATCH 2/3] Fix FutureUtils.get instead. Now only do the unwrap when using AdapterActionFuture. --- .../elasticsearch/action/StepListener.java | 19 +------------ .../action/support/AdapterActionFuture.java | 21 ++++++++++++-- .../action/index/MappingUpdatedAction.java | 19 +------------ .../common/util/concurrent/FutureUtils.java | 14 +--------- .../util/concurrent/ListenableFuture.java | 22 +-------------- .../action/StepListenerTests.java | 5 +++- .../support/AdapterActionFutureTests.java | 28 +++++++++++++++++++ 7 files changed, 55 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index cc324d931f3a9..43338d94de86d 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -19,16 +19,12 @@ package org.elasticsearch.action; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Consumer; /** @@ -89,19 +85,6 @@ public Response result() { if (delegate.isDone() == false) { throw new IllegalStateException("step is not completed yet"); } - try { - // this future is done already - use a non-blocking method. - return ((Future) delegate).get(0L, TimeUnit.NANOSECONDS); - } catch (TimeoutException e) { - throw new ElasticsearchTimeoutException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Future got interrupted", e); - } catch (ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } - throw FutureUtils.rethrowExecutionException(e); - } + return FutureUtils.get(delegate, 0L, TimeUnit.NANOSECONDS); // this future is done already - use a non-blocking method. } } diff --git a/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java index 528750ba89b90..c37e89b8396b8 100644 --- a/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java @@ -19,11 +19,13 @@ package org.elasticsearch.action.support; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import java.util.concurrent.TimeUnit; @@ -31,7 +33,11 @@ public abstract class AdapterActionFuture extends BaseFuture implements @Override public T actionGet() { - return FutureUtils.get(this); + try { + return FutureUtils.get(this); + } catch (ElasticsearchException e) { + throw unwrapEsException(e); + } } @Override @@ -51,7 +57,11 @@ public T actionGet(TimeValue timeout) { @Override public T actionGet(long timeout, TimeUnit unit) { - return FutureUtils.get(this, timeout, unit); + try { + return FutureUtils.get(this, timeout, unit); + } catch (ElasticsearchException e) { + throw unwrapEsException(e); + } } @Override @@ -66,4 +76,11 @@ public void onFailure(Exception e) { protected abstract T convert(L listenerResponse); + private static RuntimeException unwrapEsException(ElasticsearchException esEx) { + Throwable root = esEx.unwrapCause(); + if (root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index c25183ece82e9..56b2cd3a337af 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,9 +19,7 @@ package org.elasticsearch.cluster.action.index; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -31,7 +29,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.Mapping; @@ -72,20 +69,6 @@ public void setClient(Client client) { public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener listener) { client.preparePutMapping().setConcreteIndex(index).setSource(mappingUpdate.toString(), XContentType.JSON) .setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO) - .execute(new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - listener.onResponse(null); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(unwrapException(e)); - } - }); - } - - private static Exception unwrapException(Exception cause) { - return cause instanceof ElasticsearchException ? FutureUtils.unwrapEsException((ElasticsearchException) cause) : cause; + .execute(ActionListener.map(listener, r -> null)); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java index 15e26779071ec..236dc9c716266 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -86,21 +85,10 @@ public static T get(Future future, long timeout, TimeUnit unit) { } public static RuntimeException rethrowExecutionException(ExecutionException e) { - if (e.getCause() instanceof ElasticsearchException) { - ElasticsearchException esEx = (ElasticsearchException) e.getCause(); - return unwrapEsException(esEx); - } else if (e.getCause() instanceof RuntimeException) { + if (e.getCause() instanceof RuntimeException) { return (RuntimeException) e.getCause(); } else { return new UncategorizedExecutionException("Failed execution", e); } } - - public static RuntimeException unwrapEsException(ElasticsearchException esEx) { - Throwable root = esEx.unwrapCause(); - if (root instanceof ElasticsearchException || root instanceof RuntimeException) { - return (RuntimeException) root; - } - return new UncategorizedExecutionException("Failed execution", root); - } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index d9d0a126513be..67088eac91f8d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -27,11 +26,8 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * A future implementation that allows for the result to be passed to listeners waiting for @@ -112,23 +108,7 @@ private void notifyListener(ActionListener listener, ExecutorService executor protected void doRun() { // call get in a non-blocking fashion as we could be on a network thread // or another thread like the scheduler, which we should never block! - V result; - try { - result = ((Future) ListenableFuture.this).get(0L, TimeUnit.NANOSECONDS); - } catch (TimeoutException e) { - throw new ElasticsearchTimeoutException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Future got interrupted", e); - } catch (ExecutionException e) { - // FutureUtils.rethrowExecutionException unwraps cause, which means the exception reported is not the same as - // the one given to onFailure. This masks away important debug info like RemoteTransportException. - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } - throw FutureUtils.rethrowExecutionException(e); - } - V value = result; + V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS); listener.onResponse(value); } diff --git a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java index d22f77d098ce6..df57b30c52276 100644 --- a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java @@ -113,7 +113,10 @@ private void executeAction(Runnable runnable) { } } - public void testUnwrap() { + /** + * This test checks that we no longer unwrap exceptions when using StepListener. + */ + public void testNoUnwrap() { StepListener step = new StepListener<>(); step.onFailure(new RemoteTransportException("test", new RuntimeException("expected"))); AtomicReference exception = new AtomicReference<>(); diff --git a/server/src/test/java/org/elasticsearch/action/support/AdapterActionFutureTests.java b/server/src/test/java/org/elasticsearch/action/support/AdapterActionFutureTests.java index a7405ddae8cce..b2c6a8c5ba2aa 100644 --- a/server/src/test/java/org/elasticsearch/action/support/AdapterActionFutureTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/AdapterActionFutureTests.java @@ -19,12 +19,16 @@ package org.elasticsearch.action.support; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteTransportException; import java.util.Objects; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,4 +94,28 @@ protected String convert(final Integer listenerResponse) { thread.join(); } + public void testUnwrapException() { + checkUnwrap(new RemoteTransportException("test", new RuntimeException()), RuntimeException.class, RemoteTransportException.class); + checkUnwrap(new RemoteTransportException("test", new Exception()), + UncategorizedExecutionException.class, RemoteTransportException.class); + checkUnwrap(new Exception(), UncategorizedExecutionException.class, Exception.class); + checkUnwrap(new ElasticsearchException("test", new Exception()), ElasticsearchException.class, ElasticsearchException.class); + } + + private void checkUnwrap(Exception exception, Class actionGetException, + Class getException) { + final AdapterActionFuture adapter = new AdapterActionFuture() { + @Override + protected Void convert(Void listenerResponse) { + fail(); + return null; + } + }; + + adapter.onFailure(exception); + assertEquals(actionGetException, expectThrows(RuntimeException.class, adapter::actionGet).getClass()); + assertEquals(actionGetException, expectThrows(RuntimeException.class, () -> adapter.actionGet(10, TimeUnit.SECONDS)).getClass()); + assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get()).getCause().getClass()); + assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get(10, TimeUnit.SECONDS)).getCause().getClass()); + } } From c238bee060dcccb233e360985fa9e051634ad11a Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 19 Dec 2019 19:46:51 +0100 Subject: [PATCH 3/3] Do not use ActionListener.map --- .../cluster/action/index/MappingUpdatedAction.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 56b2cd3a337af..874b5eb00646c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.index; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -69,6 +70,16 @@ public void setClient(Client client) { public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener listener) { client.preparePutMapping().setConcreteIndex(index).setSource(mappingUpdate.toString(), XContentType.JSON) .setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO) - .execute(ActionListener.map(listener, r -> null)); + .execute(new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } }