From 31bc04782cd381e98a3d557baca8e495ef6bb9a5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 25 Oct 2018 13:32:21 +0200 Subject: [PATCH 1/4] [CCR] Retry when no index shard stats can be found Index shard stats for the follower shard are fetched, when a shard follow task is started. This is needed in order to bootstap the shard follow task with the follower global checkpoint. Sometimes index shard stats are not available (e.g. during a restart) and we fail now, while it is very likely that these stats will be available some time later. --- .../ccr/action/ShardFollowTasksExecutor.java | 2 +- .../elasticsearch/xpack/CcrIntegTestCase.java | 94 +++++++++++++++++++ .../xpack/ccr/IndexFollowingIT.java | 82 ---------------- .../xpack/ccr/RestartIndexFollowingIT.java | 62 ++++++++++++ 4 files changed, 157 insertions(+), 83 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 5a82b45cf8c38..313cea53e73ab 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -185,7 +185,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll return; } - if (ShardFollowNodeTask.shouldRetry(e)) { + if (ShardFollowNodeTask.shouldRetry(e) || e instanceof IndexNotFoundException) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index a4f9d69bfa924..c3cbc7436d987 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -8,6 +8,9 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -24,9 +27,11 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; @@ -35,6 +40,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalTestCluster; @@ -48,6 +54,9 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -59,14 +68,17 @@ import java.util.Collection; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING; import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -279,6 +291,88 @@ protected void ensureEmptyWriteBuffers() throws Exception { }); } + protected void pauseFollow(String... indices) throws Exception { + for (String index : indices) { + final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index); + followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get(); + } + ensureNoCcrTasks(); + } + + protected void ensureNoCcrTasks() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); + final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasks.tasks(), empty()); + + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setDetailed(true); + ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get(); + int numNodeTasks = 0; + for (TaskInfo taskInfo : listTasksResponse.getTasks()) { + if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { + numNodeTasks++; + } + } + assertThat(numNodeTasks, equalTo(0)); + }, 30, TimeUnit.SECONDS); + } + + protected String getIndexSettings(final int numberOfShards, final int numberOfReplicas, + final Map additionalIndexSettings) throws IOException { + final String settings; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("settings"); + { + builder.field("index.number_of_shards", numberOfShards); + builder.field("index.number_of_replicas", numberOfReplicas); + for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { + builder.field(additionalSetting.getKey(), additionalSetting.getValue()); + } + } + builder.endObject(); + builder.startObject("mappings"); + { + builder.startObject("doc"); + { + builder.startObject("properties"); + { + builder.startObject("f"); + { + builder.field("type", "integer"); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + settings = BytesReference.bytes(builder).utf8ToString(); + } + return settings; + } + + public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { + PutFollowAction.Request request = new PutFollowAction.Request(); + request.setRemoteCluster("leader_cluster"); + request.setLeaderIndex(leaderIndex); + request.setFollowRequest(resumeFollow(followerIndex)); + return request; + } + + public static ResumeFollowAction.Request resumeFollow(String followerIndex) { + ResumeFollowAction.Request request = new ResumeFollowAction.Request(); + request.setFollowerIndex(followerIndex); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setReadPollTimeout(TimeValue.timeValueMillis(10)); + return request; + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 794c64e6bc4ff..d755e495b7d08 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -8,7 +8,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -756,33 +755,6 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f }; } - private void pauseFollow(String... indices) throws Exception { - for (String index : indices) { - final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index); - followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get(); - } - ensureNoCcrTasks(); - } - - private void ensureNoCcrTasks() throws Exception { - assertBusy(() -> { - final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks(), empty()); - - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; - } - } - assertThat(numNodeTasks, equalTo(0)); - }, 30, TimeUnit.SECONDS); - } - private CheckedRunnable assertExpectedDocumentRunnable(final int value) { return () -> { final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get(); @@ -792,45 +764,6 @@ private CheckedRunnable assertExpectedDocumentRunnable(final int valu }; } - private String getIndexSettings(final int numberOfShards, final int numberOfReplicas, - final Map additionalIndexSettings) throws IOException { - final String settings; - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - { - builder.startObject("settings"); - { - builder.field("index.number_of_shards", numberOfShards); - builder.field("index.number_of_replicas", numberOfReplicas); - for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { - builder.field(additionalSetting.getKey(), additionalSetting.getValue()); - } - } - builder.endObject(); - builder.startObject("mappings"); - { - builder.startObject("doc"); - { - builder.startObject("properties"); - { - builder.startObject("f"); - { - builder.field("type", "integer"); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - } - builder.endObject(); - settings = BytesReference.bytes(builder).utf8ToString(); - } - return settings; - } - private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas, final Map additionalIndexSettings) throws IOException { final String settings; @@ -968,19 +901,4 @@ private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numbe }); } - public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { - PutFollowAction.Request request = new PutFollowAction.Request(); - request.setRemoteCluster("leader_cluster"); - request.setLeaderIndex(leaderIndex); - request.setFollowRequest(resumeFollow(followerIndex)); - return request; - } - - public static ResumeFollowAction.Request resumeFollow(String followerIndex) { - ResumeFollowAction.Request request = new ResumeFollowAction.Request(); - request.setFollowerIndex(followerIndex); - request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); - request.setReadPollTimeout(TimeValue.timeValueMillis(10)); - return request; - } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java new file mode 100644 index 0000000000000..49fbe15ddabae --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.Locale; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class RestartIndexFollowingIT extends CcrIntegTestCase { + + @Override + protected int numberOfNodesPerCluster() { + return 1; + } + + public void testFollowIndex() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen("index1"); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + final long firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs)); + }); + + getFollowerCluster().fullRestart(); + ensureFollowerGreen("index2"); + + final long secondBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < secondBatchNumDocs; i++) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, + equalTo(firstBatchNumDocs + secondBatchNumDocs)); + }); + } + +} From 190106fb274bf3b703b038a3ed9d00491bbba6be Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 25 Oct 2018 14:06:35 +0200 Subject: [PATCH 2/4] added a comment --- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 313cea53e73ab..34d30f5731bcc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -185,6 +185,9 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll return; } + // fetchFollowerShardInfo(...) can thrown IndexNotFoundException when no index stats are found for the follow shard and + // in this case we do want to retry since this is likely to happen when a shard task restarts + // Also prior to following an index; validation is in place to check whether the follow index actually exists. if (ShardFollowNodeTask.shouldRetry(e) || e instanceof IndexNotFoundException) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); From 4b2fb62a4b35bb461c0c4469833a85951aac85e5 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 25 Oct 2018 15:23:53 +0200 Subject: [PATCH 3/4] instead of just throwing IndexNotFoundException, throw a ShardNotFoundException if the index exists in local cluster state and a IndexNotFoundException if it no longer exists. This way we can complete rely on ShardFollowNodeTask.shouldRetry(...) --- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 34d30f5731bcc..14b87ac0d9845 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -188,7 +188,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll // fetchFollowerShardInfo(...) can thrown IndexNotFoundException when no index stats are found for the follow shard and // in this case we do want to retry since this is likely to happen when a shard task restarts // Also prior to following an index; validation is in place to check whether the follow index actually exists. - if (ShardFollowNodeTask.shouldRetry(e) || e instanceof IndexNotFoundException) { + if (ShardFollowNodeTask.shouldRetry(e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); @@ -208,7 +208,12 @@ private void fetchFollowerShardInfo( client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); if (indexStats == null) { - errorHandler.accept(new IndexNotFoundException(shardId.getIndex())); + IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + if (indexMetaData != null) { + errorHandler.accept(new ShardNotFoundException(shardId)); + } else { + errorHandler.accept(new IndexNotFoundException(shardId.getIndex())); + } return; } From b946e30ab2c18cb639200fe4bb5b18f38dd4d1a9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 25 Oct 2018 15:25:21 +0200 Subject: [PATCH 4/4] removed comment --- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 14b87ac0d9845..88d07566c74bd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -185,9 +185,6 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll return; } - // fetchFollowerShardInfo(...) can thrown IndexNotFoundException when no index stats are found for the follow shard and - // in this case we do want to retry since this is likely to happen when a shard task restarts - // Also prior to following an index; validation is in place to check whether the follow index actually exists. if (ShardFollowNodeTask.shouldRetry(e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e);