From cdc6ee643af14af6b9ccc3500d3eda42baa775e6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 1 Feb 2019 15:55:55 -0700 Subject: [PATCH 1/4] Add test for `PutFollowAction` on a closed index This is related to #35975. Currently when an index falls behind a leader it encounters a fatal exception. This commit adds a test for that scenario. Additionally, it tests that the user can stop following, close the follower index, and put follow again. After the indexing is re-bootstrapped, it will recover the documents it lost in normal following operations. --- .../xpack/ccr/IndexFollowingIT.java | 106 +++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 74c44704e2e1c..3a76cb7db49ba 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -8,6 +8,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; @@ -16,10 +18,13 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; @@ -75,6 +80,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,6 +92,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -947,6 +954,97 @@ public void testUpdateAnalysisLeaderIndexSettings() throws Exception { assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true)); } + public void testIndexFallBehind() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int numDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", numDocs); + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + for (int i = 0; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + pauseFollow("index2"); + + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet(); + } + leaderClient().prepareDelete("index1", "doc", "1").get(); + leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet(); + ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1"); + forceMergeRequest.maxNumSegments(1); + leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet(); + + followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get(); + + assertBusy(() -> { + List statuses = getFollowTaskStatuses("index2"); + Set exceptions = statuses.stream() + .map(ShardFollowNodeTaskStatus::getFatalException) + .filter(Objects::nonNull) + .map(ExceptionsHelper::unwrapCause) + .filter(e -> e instanceof ResourceNotFoundException) + .map(e -> (ResourceNotFoundException) e) + .collect(Collectors.toSet()); + assertThat(exceptions.size(), greaterThan(0)); + }); + + pauseFollow("index2"); + followerClient().admin().indices().prepareClose("index2").get(); + + + final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); + PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get(); + assertTrue(response2.isFollowIndexCreated()); + assertTrue(response2.isFollowIndexShardsAcked()); + assertTrue(response2.isIndexFollowingStarted()); + + final Map secondBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] secondBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : secondBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long indexCount = shardStats.getStats().getIndexing().getTotal().getIndexCount(); + long deleteCount = shardStats.getStats().getIndexing().getTotal().getDeleteCount(); + final long value = deleteCount + indexCount - 1; + secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); + + for (int i = 2; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i, i * 2)); + } + } + private long getFollowTaskSettingsVersion(String followerIndex) { long settingsVersion = -1L; for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) { @@ -1032,9 +1130,13 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f } private CheckedRunnable assertExpectedDocumentRunnable(final int value) { + return assertExpectedDocumentRunnable(value, value); + } + + private CheckedRunnable assertExpectedDocumentRunnable(final int key, final int value) { return () -> { - final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get(); - assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists()); + final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(key)).get(); + assertTrue("Doc with id [" + key + "] is missing", getResponse.isExists()); assertTrue((getResponse.getSource().containsKey("f"))); assertThat(getResponse.getSource().get("f"), equalTo(value)); }; From 492d80138044427a0f3b025a78b49da39d6256d8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Sun, 3 Feb 2019 23:11:05 -0700 Subject: [PATCH 2/4] Changes from review --- .../xpack/ccr/IndexFollowingIT.java | 33 ++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 3a76cb7db49ba..2a396854c0250 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -974,17 +974,7 @@ public void testIndexFallBehind() throws Exception { assertTrue(response.isFollowIndexShardsAcked()); assertTrue(response.isIndexFollowingStarted()); - final Map firstBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] firstBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : firstBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; - firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); - } - } - - assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + assertIndexFullyReplicatedToFollower("index1", "index2"); for (int i = 0; i < numDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } @@ -994,10 +984,10 @@ public void testIndexFallBehind() throws Exception { for (int i = 0; i < numDocs; i++) { final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2); leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); - leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet(); } leaderClient().prepareDelete("index1", "doc", "1").get(); leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet(); + leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet(); ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1"); forceMergeRequest.maxNumSegments(1); leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet(); @@ -1012,6 +1002,9 @@ public void testIndexFallBehind() throws Exception { .map(ExceptionsHelper::unwrapCause) .filter(e -> e instanceof ResourceNotFoundException) .map(e -> (ResourceNotFoundException) e) + .filter(e -> { + return e.getMetadataKeys().contains("es.requested_operations_missing"); + }) .collect(Collectors.toSet()); assertThat(exceptions.size(), greaterThan(0)); }); @@ -1026,20 +1019,8 @@ public void testIndexFallBehind() throws Exception { assertTrue(response2.isFollowIndexShardsAcked()); assertTrue(response2.isIndexFollowingStarted()); - final Map secondBatchNumDocsPerShard = new HashMap<>(); - final ShardStats[] secondBatchShardStats = - leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); - for (final ShardStats shardStats : secondBatchShardStats) { - if (shardStats.getShardRouting().primary()) { - long indexCount = shardStats.getStats().getIndexing().getTotal().getIndexCount(); - long deleteCount = shardStats.getStats().getIndexing().getTotal().getDeleteCount(); - final long value = deleteCount + indexCount - 1; - secondBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); - } - } - - assertBusy(assertTask(numberOfPrimaryShards, secondBatchNumDocsPerShard)); - + ensureFollowerGreen("index2"); + assertIndexFullyReplicatedToFollower("index1", "index2"); for (int i = 2; i < numDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i, i * 2)); } From a261662e3e73443b8b257eda6cdd53c0ae4b6c4d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 4 Feb 2019 10:11:44 -0700 Subject: [PATCH 3/4] Add test --- .../xpack/ccr/IndexFollowingIT.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 2a396854c0250..80101f15b925f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; @@ -58,6 +59,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.SnapshotRestoreException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.CcrIntegTestCase; @@ -954,6 +956,26 @@ public void testUpdateAnalysisLeaderIndexSettings() throws Exception { assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true)); } + public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2"); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); + expectThrows(SnapshotRestoreException.class, () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); + + followerClient().admin().indices().prepareClose("index2").get(); + expectThrows(ResourceAlreadyExistsException.class, () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); + } + public void testIndexFallBehind() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), @@ -1002,15 +1024,13 @@ public void testIndexFallBehind() throws Exception { .map(ExceptionsHelper::unwrapCause) .filter(e -> e instanceof ResourceNotFoundException) .map(e -> (ResourceNotFoundException) e) - .filter(e -> { - return e.getMetadataKeys().contains("es.requested_operations_missing"); - }) + .filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing")) .collect(Collectors.toSet()); assertThat(exceptions.size(), greaterThan(0)); }); - pauseFollow("index2"); followerClient().admin().indices().prepareClose("index2").get(); + pauseFollow("index2"); final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); From e2025f35735083148acd1f1ecaca0716687874ff Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 4 Feb 2019 20:49:06 +0100 Subject: [PATCH 4/4] fix checkstyle --- .../java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 80101f15b925f..eee28b5875bcc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -970,10 +970,12 @@ public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Excepti assertTrue(response.isIndexFollowingStarted()); final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); - expectThrows(SnapshotRestoreException.class, () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); + expectThrows(SnapshotRestoreException.class, + () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); followerClient().admin().indices().prepareClose("index2").get(); - expectThrows(ResourceAlreadyExistsException.class, () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); + expectThrows(ResourceAlreadyExistsException.class, + () -> followerClient().execute(PutFollowAction.INSTANCE, followRequest2).actionGet()); } public void testIndexFallBehind() throws Exception {