Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Add logging of shard failures #75275

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* An exception indicating that a failure occurred performing an operation on the shard.
*
*/
public abstract class ShardOperationFailedException implements Writeable, ToXContentObject {
public abstract class ShardOperationFailedException extends Exception implements Writeable, ToXContentObject {

protected String index;
protected int shardId = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
import org.elasticsearch.ElasticsearchException;

class CheckpointException extends ElasticsearchException {
CheckpointException(String msg, Object... params) {
super(msg, null, params);
}

CheckpointException(String msg, Throwable cause, Object... params) {
super(msg, cause, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,19 @@ private static void getCheckpointsFromOneCluster(
new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
ActionListener.wrap(response -> {
if (response.getFailedShards() != 0) {
listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
for (int i = 0; i < response.getShardFailures().length; ++i) {
logger.warn(
new ParameterizedMessage(
"Source has [{}] failed shards, shard failure [{}]",
response.getFailedShards(), i).getFormattedMessage(),
response.getShardFailures()[i]);
}
listener.onFailure(
new CheckpointException(
"Source has [{}] failed shards, first shard failure: {}",
response.getShardFailures()[0],
response.getFailedShards(),
response.getShardFailures()[0].toString()));
return;
}
listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class RemoteClusterResolver extends RemoteClusterAware {

private final CopyOnWriteArraySet<String> clusters;

class ResolvedIndices {
static class ResolvedIndices {
private final Map<String, List<String>> remoteIndicesPerClusterAlias;
private final List<String> localIndices;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", getJobId()), createCheckpointException);
listener.onFailure(
new RuntimeException(
"Failed to create checkpoint due to " + createCheckpointException.getMessage(),
"Failed to create checkpoint due to: " + createCheckpointException.getMessage(),
createCheckpointException
)
);
Expand All @@ -237,7 +237,7 @@ protected void createCheckpoint(ActionListener<TransformCheckpoint> listener) {
logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", getJobId()), getCheckPointException);
listener.onFailure(
new RuntimeException(
"Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(),
"Failed to retrieve checkpoint due to: " + getCheckPointException.getMessage(),
getCheckPointException
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,44 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor.AuditExpectation;
import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
import org.junit.Before;
import org.mockito.stubbing.Answer;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DefaultCheckpointProviderTests extends ESTestCase {

Expand All @@ -40,7 +59,10 @@ public class DefaultCheckpointProviderTests extends ESTestCase {

@Before
public void setUpMocks() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
transformConfigManager = mock(IndexBasedTransformConfigManager.class);
transformAuditor = MockTransformAuditor.createMockAuditor();
}
Expand Down Expand Up @@ -188,6 +210,55 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception {
);
}

public void testHandlingShardFailures() throws Exception {
String transformId = getTestName();
String indexName = "some-index";
TransformConfig transformConfig =
new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId))
.setSource(new SourceConfig(indexName))
.build();

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);
doReturn(new RemoteClusterResolver.ResolvedIndices(Collections.emptyMap(), Collections.singletonList(indexName)))
.when(remoteClusterResolver).resolve(transformConfig.getSource().getIndex());

GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { indexName }, null, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());

IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
doReturn(7).when(indicesStatsResponse).getFailedShards();
doReturn(
new DefaultShardOperationFailedException[] {
new DefaultShardOperationFailedException(indexName, 3, new Exception("something's wrong"))
}).when(indicesStatsResponse).getShardFailures();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

CountDownLatch latch = new CountDownLatch(1);
provider.createNextCheckpoint(
null,
new LatchedActionListener<>(
ActionListener.wrap(
response -> fail("This test case must fail"),
e -> assertThat(
e.getMessage(),
startsWith(
"Source has [7] failed shards, first shard failure: [some-index][3] failed, "
+ "reason [java.lang.Exception: something's wrong"))
),
latch
)
);
latch.await(10, TimeUnit.SECONDS);
}

private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock)
throws IllegalAccessException {
MockLogAppender mockLogAppender = new MockLogAppender();
Expand All @@ -210,4 +281,12 @@ private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpec
}
}

@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
listener.onResponse(response);
return null;
};
}
}