diff --git a/server/src/main/java/org/elasticsearch/action/ShardOperationFailedException.java b/server/src/main/java/org/elasticsearch/action/ShardOperationFailedException.java index 95dffc11c4897..e3691c8d606e0 100644 --- a/server/src/main/java/org/elasticsearch/action/ShardOperationFailedException.java +++ b/server/src/main/java/org/elasticsearch/action/ShardOperationFailedException.java @@ -19,7 +19,7 @@ * An exception indicating that a failure occurred performing an operation on the shard. * */ -public abstract class ShardOperationFailedException implements Writeable, ToXContentObject { +public abstract class ShardOperationFailedException extends Exception implements Writeable, ToXContentObject { protected String index; protected int shardId = -1; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java index 1a918e19ca3da..7cf1f3b9adb0f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/CheckpointException.java @@ -10,10 +10,6 @@ import org.elasticsearch.ElasticsearchException; class CheckpointException extends ElasticsearchException { - CheckpointException(String msg, Object... params) { - super(msg, null, params); - } - CheckpointException(String msg, Throwable cause, Object... params) { super(msg, cause, params); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index c9ec4f1af8df7..5a4acfea022f8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -165,7 +165,19 @@ private static void getCheckpointsFromOneCluster( new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN), ActionListener.wrap(response -> { if (response.getFailedShards() != 0) { - listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards")); + for (int i = 0; i < response.getShardFailures().length; ++i) { + logger.warn( + new ParameterizedMessage( + "Source has [{}] failed shards, shard failure [{}]", + response.getFailedShards(), i).getFormattedMessage(), + response.getShardFailures()[i]); + } + listener.onFailure( + new CheckpointException( + "Source has [{}] failed shards, first shard failure: {}", + response.getShardFailures()[0], + response.getFailedShards(), + response.getShardFailures()[0].toString())); return; } listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index 088c8ff1d0c2e..83e525a7a7fba 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -24,7 +24,7 @@ class RemoteClusterResolver extends RemoteClusterAware { private final CopyOnWriteArraySet clusters; - class ResolvedIndices { + static class ResolvedIndices { private final Map> remoteIndicesPerClusterAlias; private final List localIndices; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index eba9b28aab09a..93c21854514b9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -227,7 +227,7 @@ protected void createCheckpoint(ActionListener listener) { logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", getJobId()), createCheckpointException); listener.onFailure( new RuntimeException( - "Failed to create checkpoint due to " + createCheckpointException.getMessage(), + "Failed to create checkpoint due to: " + createCheckpointException.getMessage(), createCheckpointException ) ); @@ -237,7 +237,7 @@ protected void createCheckpoint(ActionListener listener) { logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", getJobId()), getCheckPointException); listener.onFailure( new RuntimeException( - "Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(), + "Failed to retrieve checkpoint due to: " + getCheckPointException.getMessage(), getCheckPointException ) ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index dc4b91f1e36ca..05c61cd233b54 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -10,25 +10,44 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.indices.get.GetIndexAction; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.MockLogAppender.LoggingExpectation; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor.AuditExpectation; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.junit.Before; +import org.mockito.stubbing.Answer; import java.util.Collections; import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DefaultCheckpointProviderTests extends ESTestCase { @@ -40,7 +59,10 @@ public class DefaultCheckpointProviderTests extends ESTestCase { @Before public void setUpMocks() { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); transformConfigManager = mock(IndexBasedTransformConfigManager.class); transformAuditor = MockTransformAuditor.createMockAuditor(); } @@ -188,6 +210,55 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception { ); } + public void testHandlingShardFailures() throws Exception { + String transformId = getTestName(); + String indexName = "some-index"; + TransformConfig transformConfig = + new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)) + .setSource(new SourceConfig(indexName)) + .build(); + + RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); + doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName))) + .when(remoteClusterResolver).resolve(transformConfig.getSource().getIndex()); + + GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null); + doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); + + IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); + doReturn(7).when(indicesStatsResponse).getFailedShards(); + doReturn( + new DefaultShardOperationFailedException[] { + new DefaultShardOperationFailedException(indexName, 3, new Exception("something's wrong")) + }).when(indicesStatsResponse).getShardFailures(); + doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); + + DefaultCheckpointProvider provider = new DefaultCheckpointProvider( + client, + remoteClusterResolver, + transformConfigManager, + transformAuditor, + transformConfig + ); + + CountDownLatch latch = new CountDownLatch(1); + provider.createNextCheckpoint( + null, + new LatchedActionListener<>( + ActionListener.wrap( + response -> fail("This test case must fail"), + e -> assertThat( + e.getMessage(), + startsWith( + "Source has [7] failed shards, first shard failure: [some-index][3] failed, " + + "reason [java.lang.Exception: something's wrong")) + ), + latch + ) + ); + latch.await(10, TimeUnit.SECONDS); + } + private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock) throws IllegalAccessException { MockLogAppender mockLogAppender = new MockLogAppender(); @@ -210,4 +281,12 @@ private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpec } } + @SuppressWarnings("unchecked") + private static Answer withResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; + }; + } }