Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ActionListener.map exception handling #50886

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,27 @@ static <Response> ActionListener<Response> wrap(Runnable runnable) {
* Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along
* exceptions to the delegate.
*
* Notice that if the listener onResponse handler fails, the exception will bubble out, whereas if the function fails, the listeners
* onFailure handler will be called. The principle is that the code using this is responsible for the function, whereas the listener
* should do its own exception handling since it is a different component.
*
* @param listener Listener to delegate to
* @param fn Function to apply to listener response
* @param <Response> Response type of the new listener
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to its delegate listener
*/
static <T, Response> ActionListener<Response> map(ActionListener<T> listener, CheckedFunction<Response, T, Exception> fn) {
return wrap(r -> listener.onResponse(fn.apply(r)), listener::onFailure);
return delegateFailure(listener, (ActionListener<T> delegate, Response response) -> {
T mapped;
try {
mapped = fn.apply(response);
} catch (Exception e) {
delegate.onFailure(e);
return;
}
delegate.onResponse(mapped);
Copy link
Member

@original-brownbear original-brownbear Jan 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the point in this change, but note that I added the .map shortcut back when I added it to dry up a bunch of ActionListener.wrap(..., listener::onFailure) spots.
I think we'd basically have to audit every spot that we use .map in now and make sure that the listener/delegate will actually handle it's own onResponse failures (from a quick read over the spots we use map in this may already hold true).

Maybe we should assert this and do something like:

try {
   delegate.onResponse(mapped);
} catch (Exception e) {
   assert false: e;
   throw e;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the assert. Local CI seems happy about it.

});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,38 @@ public void testCompleteWith() {
assertThat(onFailureListener.isDone(), equalTo(true));
assertThat(expectThrows(ExecutionException.class, onFailureListener::get).getCause(), instanceOf(IOException.class));
}

public void testMap() {
AtomicReference<Exception> exReference = new AtomicReference<>();

ActionListener<String> listener = new ActionListener<>() {
@Override
public void onResponse(String s) {
if (s == null) {
throw new IllegalArgumentException("simulate onResponse exception");
}
}

@Override
public void onFailure(Exception e) {
exReference.set(e);
}
};
ActionListener<Boolean> mapped = ActionListener.map(listener, b -> {
if (b == null) {
return null;
} else if (b) {
throw new IllegalStateException("simulate map function exception");
} else {
return b.toString();
}
});

expectThrows(IllegalArgumentException.class, () -> mapped.onResponse(null));
assertNull(exReference.get());
mapped.onResponse(false);
assertNull(exReference.get());
mapped.onResponse(true);
assertThat(exReference.get(), instanceOf(IllegalStateException.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.action.admin.indices.create;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand All @@ -32,7 +35,9 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -44,6 +49,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.MockLogAppender;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -379,4 +385,43 @@ public void testIndexNameInResponse() {
assertEquals("Should have index name in response", "foo", response.index());
}

henningandersen marked this conversation as resolved.
Show resolved Hide resolved
public void testOnResponseFailureOnMaster() throws Exception {
final String exceptionMessage = randomAlphaOfLength(10);
final Logger logger = LogManager.getLogger(ClusterApplierService.class);
final MockLogAppender appender = new MockLogAppender();
appender.start();
appender.addExpectation(
new MockLogAppender.ExceptionSeenEventExpectation(
getTestName(),
logger.getName(),
Level.WARN,
"failed to notify ClusterStateListener",
TestException.class,
exceptionMessage));
try {
Loggers.addAppender(logger, appender);

internalCluster().masterClient().admin().indices().prepareCreate("test").execute(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
throw new TestException(exceptionMessage);
}

@Override
public void onFailure(Exception e) {
fail();
}
});
assertBusy(appender::assertAllExpectationsMatched);
} finally {
Loggers.removeAppender(logger, appender);
appender.stop();
}
}

private static class TestException extends RuntimeException {
TestException(String message) {
super(message);
}
}
}