Skip to content

Commit

Permalink
[ML] Unwrap exception causes before calling instanceof (#47676)
Browse files Browse the repository at this point in the history
When exceptions could be returned from another node, the exception
might be wrapped in a `RemoteTransportException`. In places where
we handled specific exceptions using `instanceof` we ought to unwrap
the cause first.

This commit attempts to fix this issue after searching code in the ML
plugin.
  • Loading branch information
dimitris-athanasiou authored Oct 8, 2019
1 parent 7b652ad commit 3bf141d
Show file tree
Hide file tree
Showing 31 changed files with 64 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.SortedMap;
Expand Down Expand Up @@ -84,7 +85,7 @@ public static void createAnnotationsIndexIfNecessary(Settings settings, Client c
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
// Create the alias
createAliasListener.onResponse(true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -133,7 +134,7 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (createIndexFailure instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
} else {
finalListener.onFailure(createIndexFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,8 @@ public static <T> T requireNonNull(T obj, String paramName) {
public static <T> T requireNonNull(T obj, ParseField paramName) {
return requireNonNull(obj, paramName.getPreferredName());
}

public static Throwable unwrapCause(Throwable t) {
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -467,7 +468,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// the snapshot already exists
listener.onResponse(Boolean.TRUE);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if ((e instanceof ResourceNotFoundException &&
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) {
failures.set(slot - 1, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persisten

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has been removed in between
listener.onResponse(true);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String
response -> finishedHandler.onResponse(true),
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
Expand Down Expand Up @@ -466,7 +466,7 @@ private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient,
},
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
Expand Down Expand Up @@ -536,7 +536,7 @@ public void onResponse(Boolean response) {

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
normalDeleteJob(parentTaskClient, request, listener);
} else {
listener.onFailure(e);
Expand All @@ -549,7 +549,7 @@ public void onFailure(Exception e) {
ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
response -> removePersistentTask(request.getJobId(), state, removeTaskListener),
e -> {
if (e instanceof ElasticsearchStatusException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ElasticsearchStatusException) {
// Killing the process marks the task as completed so it
// may have disappeared when we get here
removePersistentTask(request.getJobId(), state, removeTaskListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobActio

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot open job [" + jobParams.getJobId() +
"] because it has already been opened", RestStatus.CONFLICT, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void onResponse(IndexResponse indexResponse) {

@Override
public void onFailure(Exception e) {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
listener.onFailure(ExceptionsHelper.badRequestException("Cannot create calendar with id [" +
calendar.getId() + "] as it already exists"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void masterOperation(Task task, PutDatafeedAction.Request request, Clu
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
},
e -> {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void onResponse(IndexResponse indexResponse) {
@Override
public void onFailure(Exception e) {
Exception reportedException;
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
reportedException = new ResourceAlreadyExistsException("A filter with id [" + filter.getId()
+ "] already exists");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;

import java.io.IOException;
Expand Down Expand Up @@ -299,7 +300,7 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
// If the task was removed from the node, all is well
// We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion
// Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
ex -> ex instanceof ResourceNotFoundException == false);
ex -> ExceptionsHelper.unwrapCause(ex) instanceof ResourceNotFoundException == false);

for (PersistentTask<?> task : datafeedAndJobTasks) {
chainTaskExecutor.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDataFra

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot start data frame analytics [" + request.getId() +
"] because it has already been started", RestStatus.CONFLICT, e);
}
Expand Down Expand Up @@ -342,7 +342,7 @@ private void checkDestIndexIsEmptyIfExists(StartContext startContext, ActionList
}
},
e -> {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
listener.onResponse(startContext);
} else {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafee

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
logger.debug("datafeed already started", e);
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
"] because it has already been started", RestStatus.CONFLICT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected void doRun() {
});
},
e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has disappeared so must have stopped
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

Expand Down Expand Up @@ -195,7 +196,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persisten
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if ((e instanceof ResourceNotFoundException &&
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
failures.set(slot - 1, e);
}
Expand Down Expand Up @@ -244,7 +245,7 @@ protected void doRun() throws Exception {
});
},
e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has disappeared so must have stopped
listener.onResponse(new StopDatafeedAction.Response(true));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void onResponse(IndexResponse indexResponse) {
@Override
public void onFailure(Exception e) {
Exception reportedException;
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
reportedException = ExceptionsHelper.conflictStatusException("Error updating filter with id [" + filter.getId()
+ "] because it was modified while the update was in progress", e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
.size(1)
.includeInterim(false);
jobResultsProvider.bucketsViaInternalClient(jobId, latestBucketQuery, bucketsHandler, e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
bucketsHandler.accept(empty);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void onResponse(PersistentTask<?> persistentTask) {

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// The task was stopped in the meantime, no need to do anything
logger.info("[{}] Aborting as datafeed has been stopped", datafeedId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
Expand Down Expand Up @@ -59,9 +60,10 @@ static void create(Client client,
}
},
e -> {
if (e instanceof IndexNotFoundException) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException)e).getIndex() + " does not exist"));
+ "] cannot retrieve data because index " + ((IndexNotFoundException) cause).getIndex() + " does not exist"));
} else {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ public static void create(Client client,
new ScrollDataExtractorFactory(client, datafeed, job, extractedFields, xContentRegistry, timingStatsReporter));
},
e -> {
if (e instanceof IndexNotFoundException) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
+ "] cannot retrieve data because index " + ((IndexNotFoundException) cause).getIndex() + " does not exist"));
} else if (e instanceof IllegalArgumentException) {
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void putDatafeedConfig(DatafeedConfig config, Map<String, String> headers
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
listener::onResponse,
e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// the dafafeed already exists
listener.onFailure(ExceptionsHelper.datafeedAlreadyExists(datafeedId));
} else {
Expand Down
Loading

0 comments on commit 3bf141d

Please sign in to comment.