Skip to content

Commit

Permalink
Improve FutureUtils.get exception handling (#50339)
Browse files Browse the repository at this point in the history
FutureUtils.get() would unwrap ElasticsearchWrapperExceptions. This
is trappy, since nearly all usages of FutureUtils.get() expected only to
not have to deal with checked exceptions.

In particular, StepListener builds upon ListenableFuture which uses
FutureUtils.get to be informed about the exception passed to onFailure.
This had the bad consequence of masking away any exception that was an
ElasticsearchWrapperException like RemoteTransportException.
Specifically for recovery, this made CircuitBreakerExceptions happening
on the target node look like they originated from the source node.

The only usage that expected that behaviour was AdapterActionFuture.
The unwrap behaviour has been moved to that class.
  • Loading branch information
henningandersen authored Dec 20, 2019
1 parent e6d2b29 commit c82113e
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@

package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;

import java.util.concurrent.TimeUnit;

public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<L> {

@Override
public T actionGet() {
return FutureUtils.get(this);
try {
return FutureUtils.get(this);
} catch (ElasticsearchException e) {
throw unwrapEsException(e);
}
}

@Override
Expand All @@ -51,7 +57,11 @@ public T actionGet(TimeValue timeout) {

@Override
public T actionGet(long timeout, TimeUnit unit) {
return FutureUtils.get(this, timeout, unit);
try {
return FutureUtils.get(this, timeout, unit);
} catch (ElasticsearchException e) {
throw unwrapEsException(e);
}
}

@Override
Expand All @@ -66,4 +76,11 @@ public void onFailure(Exception e) {

protected abstract T convert(L listenerResponse);

private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
Throwable root = esEx.unwrapCause();
if (root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.cluster.action.index;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
Expand All @@ -31,7 +30,6 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.Mapping;
Expand Down Expand Up @@ -72,20 +70,16 @@ public void setClient(Client client) {
public void updateMappingOnMaster(Index index, Mapping mappingUpdate, ActionListener<Void> listener) {
client.preparePutMapping().setConcreteIndex(index).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
.execute(new ActionListener<AcknowledgedResponse>() {
.execute(new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(unwrapException(e));
listener.onFailure(e);
}
});
}

private static Exception unwrapException(Exception cause) {
return cause instanceof ElasticsearchException ? FutureUtils.unwrapEsException((ElasticsearchException) cause) : cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
Expand Down Expand Up @@ -86,21 +85,10 @@ public static <T> T get(Future<T> future, long timeout, TimeUnit unit) {
}

public static RuntimeException rethrowExecutionException(ExecutionException e) {
if (e.getCause() instanceof ElasticsearchException) {
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
return unwrapEsException(esEx);
} else if (e.getCause() instanceof RuntimeException) {
if (e.getCause() instanceof RuntimeException) {
return (RuntimeException) e.getCause();
} else {
return new UncategorizedExecutionException("Failed execution", e);
}
}

public static RuntimeException unwrapEsException(ElasticsearchException esEx) {
Throwable root = esEx.unwrapCause();
if (root instanceof ElasticsearchException || root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.junit.After;
import org.junit.Before;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -110,4 +112,20 @@ private void executeAction(Runnable runnable) {
runnable.run();
}
}

/**
* This test checks that we no longer unwrap exceptions when using StepListener.
*/
public void testNoUnwrap() {
StepListener<String> step = new StepListener<>();
step.onFailure(new RemoteTransportException("test", new RuntimeException("expected")));
AtomicReference<RuntimeException> exception = new AtomicReference<>();
step.whenComplete(null, e -> {
exception.set((RuntimeException) e);
});

assertEquals(RemoteTransportException.class, exception.get().getClass());
RuntimeException e = expectThrows(RuntimeException.class, () -> step.result());
assertEquals(RemoteTransportException.class, e.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Objects;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -90,4 +94,28 @@ protected String convert(final Integer listenerResponse) {
thread.join();
}

public void testUnwrapException() {
checkUnwrap(new RemoteTransportException("test", new RuntimeException()), RuntimeException.class, RemoteTransportException.class);
checkUnwrap(new RemoteTransportException("test", new Exception()),
UncategorizedExecutionException.class, RemoteTransportException.class);
checkUnwrap(new Exception(), UncategorizedExecutionException.class, Exception.class);
checkUnwrap(new ElasticsearchException("test", new Exception()), ElasticsearchException.class, ElasticsearchException.class);
}

private void checkUnwrap(Exception exception, Class<? extends Exception> actionGetException,
Class<? extends Exception> getException) {
final AdapterActionFuture<Void, Void> adapter = new AdapterActionFuture<Void, Void>() {
@Override
protected Void convert(Void listenerResponse) {
fail();
return null;
}
};

adapter.onFailure(exception);
assertEquals(actionGetException, expectThrows(RuntimeException.class, adapter::actionGet).getClass());
assertEquals(actionGetException, expectThrows(RuntimeException.class, () -> adapter.actionGet(10, TimeUnit.SECONDS)).getClass());
assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get()).getCause().getClass());
assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get(10, TimeUnit.SECONDS)).getCause().getClass());
}
}

0 comments on commit c82113e

Please sign in to comment.