From e73140e4c125ec2302bf8e4566c0995bc3e99574 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 31 Jan 2019 09:31:02 -0700 Subject: [PATCH] Use `CcrRepository` to init follower index (#35719) (#37988) This commit modifies the put follow index action to use a CcrRepository when creating a follower index. It routes the logic through the snapshot/restore process. A wait_for_active_shards parameter can be used to configure how long to wait before returning the response. --- .../client/CcrRequestConverters.java | 3 + .../client/ccr/PutFollowRequest.java | 18 +- .../java/org/elasticsearch/client/CCRIT.java | 3 +- .../documentation/CCRDocumentationIT.java | 14 +- .../high-level/ccr/put_follow.asciidoc | 2 + .../ccr/apis/follow/get-follow-info.asciidoc | 2 +- .../ccr/apis/follow/get-follow-stats.asciidoc | 2 +- .../apis/follow/post-pause-follow.asciidoc | 2 +- .../apis/follow/post-resume-follow.asciidoc | 2 +- .../ccr/apis/follow/post-unfollow.asciidoc | 2 +- .../ccr/apis/follow/put-follow.asciidoc | 9 +- .../reference/ccr/apis/get-ccr-stats.asciidoc | 2 +- docs/reference/ccr/getting-started.asciidoc | 2 +- .../restore/RestoreClusterStateListener.java | 87 +++++++ .../TransportRestoreSnapshotAction.java | 43 +--- .../test/ccr/follow_and_unfollow.yml | 1 + .../rest-api-spec/test/ccr/follow_info.yml | 1 + .../rest-api-spec/test/ccr/follow_stats.yml | 1 + .../index_directly_into_follower_index.yml | 1 + .../xpack/ccr/ESCCRRestTestCase.java | 3 +- .../xpack/ccr/CcrLicenseChecker.java | 19 +- .../xpack/ccr/action/Pre67PutFollow.java | 141 +++++++++++ .../ccr/action/TransportPutFollowAction.java | 218 ++++++++++-------- .../action/TransportResumeFollowAction.java | 7 +- .../xpack/ccr/repository/CcrRepository.java | 18 +- .../xpack/ccr/rest/RestPutFollowAction.java | 4 +- .../elasticsearch/xpack/CcrIntegTestCase.java | 13 +- .../xpack/CcrSingleNodeTestCase.java | 2 + .../xpack/ccr/IndexFollowingIT.java | 148 +++++++++++- .../xpack/ccr/LocalIndexFollowingIT.java | 6 +- .../action/PutFollowActionRequestTests.java | 3 +- .../core/ccr/action/PutFollowAction.java | 60 ++++- .../indexlifecycle/CCRIndexLifecycleIT.java | 5 +- .../rest-api-spec/api/ccr.follow.json | 7 + 34 files changed, 668 insertions(+), 183 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/Pre67PutFollow.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java index df1e5dc01aef5..7210f903fd362 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java @@ -23,6 +23,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.ccr.CcrStatsRequest; import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest; import org.elasticsearch.client.ccr.FollowStatsRequest; @@ -46,6 +47,8 @@ static Request putFollow(PutFollowRequest putFollowRequest) throws IOException { .addPathPartAsIs("_ccr", "follow") .build(); Request request = new Request(HttpPut.METHOD_NAME, endpoint); + RequestConverters.Params parameters = new RequestConverters.Params(request); + parameters.withWaitForActiveShards(putFollowRequest.waitForActiveShards(), ActiveShardCount.NONE); request.setEntity(createEntity(putFollowRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java index 98e9d224564cf..8307b04bd7087 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ccr; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Validatable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -36,11 +37,17 @@ public final class PutFollowRequest extends FollowConfig implements Validatable, private final String remoteCluster; private final String leaderIndex; private final String followerIndex; + private final ActiveShardCount waitForActiveShards; public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) { + this(remoteCluster, leaderIndex, followerIndex, ActiveShardCount.NONE); + } + + public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) { this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster"); this.leaderIndex = Objects.requireNonNull(leaderIndex, "leaderIndex"); this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex"); + this.waitForActiveShards = waitForActiveShards; } @Override @@ -66,13 +73,18 @@ public String getFollowerIndex() { return followerIndex; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; PutFollowRequest that = (PutFollowRequest) o; - return Objects.equals(remoteCluster, that.remoteCluster) && + return Objects.equals(waitForActiveShards, that.waitForActiveShards) && + Objects.equals(remoteCluster, that.remoteCluster) && Objects.equals(leaderIndex, that.leaderIndex) && Objects.equals(followerIndex, that.followerIndex); } @@ -83,7 +95,7 @@ public int hashCode() { super.hashCode(), remoteCluster, leaderIndex, - followerIndex - ); + followerIndex, + waitForActiveShards); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index 9ab65448e45a0..46393fd8da276 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.ccr.CcrStatsRequest; import org.elasticsearch.client.ccr.CcrStatsResponse; @@ -98,7 +99,7 @@ public void testIndexFollowing() throws Exception { CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT); assertThat(response.isAcknowledged(), is(true)); - PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower"); + PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE); PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 9ef01a1a6f7bf..2e54d1c4a1a7c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -97,7 +98,8 @@ public void testPutFollow() throws Exception { PutFollowRequest putFollowRequest = new PutFollowRequest( "local", // <1> "leader", // <2> - "follower" // <3> + "follower", // <3> + ActiveShardCount.ONE // <4> ); // end::ccr-put-follow-request @@ -175,7 +177,7 @@ public void testPauseFollow() throws Exception { String followIndex = "follower"; // Follow index, so that it can be paused: { - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -241,7 +243,7 @@ public void testResumeFollow() throws Exception { String followIndex = "follower"; // Follow index, so that it can be paused: { - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -317,7 +319,7 @@ public void testUnfollow() throws Exception { String followIndex = "follower"; // Follow index, pause and close, so that it can be unfollowed: { - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -349,7 +351,7 @@ public void testUnfollow() throws Exception { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(followIndex); assertThat(client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT).isAcknowledged(), is(true)); - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -639,7 +641,7 @@ public void testGetFollowStats() throws Exception { } { // Follow index, so that we can query for follow stats: - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower"); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower", ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); diff --git a/docs/java-rest/high-level/ccr/put_follow.asciidoc b/docs/java-rest/high-level/ccr/put_follow.asciidoc index 839a9bedc6552..2f40bbd5d2b2d 100644 --- a/docs/java-rest/high-level/ccr/put_follow.asciidoc +++ b/docs/java-rest/high-level/ccr/put_follow.asciidoc @@ -20,6 +20,8 @@ include-tagged::{doc-tests-file}[{api}-request] <1> The name of the remote cluster alias. <2> The name of the leader in the remote cluster. <3> The name of the follower index that gets created as part of the put follow API call. +<4> The number of active shard copies to wait for before the put follow API returns a +response, as an `ActiveShardCount` [id="{upid}-{api}-response"] ==== Response diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc index 22418db10887c..212b1167b6e33 100644 --- a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -22,7 +22,7 @@ replication options and whether the follower indices are active or paused. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc index 766f502ff93a3..8c02582e01278 100644 --- a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc @@ -21,7 +21,7 @@ following tasks associated with each shard for the specified indices. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc b/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc index 0d56ee76bd9b9..f5b0bef7b2994 100644 --- a/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc @@ -24,7 +24,7 @@ following task. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc b/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc index e8b4cd50f27e7..736061f2bfde8 100644 --- a/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc @@ -23,7 +23,7 @@ returns, the follower index will resume fetching operations from the leader inde [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/post-unfollow.asciidoc b/docs/reference/ccr/apis/follow/post-unfollow.asciidoc index 6507c04ac5026..c3126d02d1efc 100644 --- a/docs/reference/ccr/apis/follow/post-unfollow.asciidoc +++ b/docs/reference/ccr/apis/follow/post-unfollow.asciidoc @@ -27,7 +27,7 @@ irreversible operation. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/put-follow.asciidoc b/docs/reference/ccr/apis/follow/put-follow.asciidoc index 3f6156c1e6820..52253d6ad2f4c 100644 --- a/docs/reference/ccr/apis/follow/put-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/put-follow.asciidoc @@ -31,7 +31,7 @@ POST /follower_index/_ccr/pause_follow [source,js] -------------------------------------------------- -PUT //_ccr/follow +PUT //_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "", "leader_index" : "" @@ -43,6 +43,11 @@ PUT //_ccr/follow // TEST[s//remote_cluster/] // TEST[s//leader_index/] +The `wait_for_active_shards` parameter specifies the number of shards to wait on being active +before responding. This defaults to waiting on none of the shards to be active. A shard must +be restored from the leader index being active. Restoring a follower shard requires transferring +all the remote Lucene segment files to the follower index. + ==== Path Parameters `follower_index` (required):: @@ -73,7 +78,7 @@ This example creates a follower index named `follower_index`: [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index", diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index f47e49ee82674..8949de8787fa7 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -22,7 +22,7 @@ shard-level stats as in the <>. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/getting-started.asciidoc b/docs/reference/ccr/getting-started.asciidoc index b44153a74bc4a..c83d43e2ffbe4 100644 --- a/docs/reference/ccr/getting-started.asciidoc +++ b/docs/reference/ccr/getting-started.asciidoc @@ -238,7 +238,7 @@ cluster. [source,js] -------------------------------------------------- -PUT /server-metrics-copy/_ccr/follow +PUT /server-metrics-copy/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "leader", "leader_index" : "server-metrics" diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java new file mode 100644 index 0000000000000..a74aad3ddb586 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.restore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.RestoreService; + +import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; + +public class RestoreClusterStateListener implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class); + + private final ClusterService clusterService; + private final String uuid; + private final ActionListener listener; + + + private RestoreClusterStateListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response, + ActionListener listener) { + this.clusterService = clusterService; + this.uuid = response.getUuid(); + this.listener = listener; + } + + @Override + public void clusterChanged(ClusterChangedEvent changedEvent) { + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); + if (prevEntry == null) { + // When there is a master failure after a restore has been started, this listener might not be registered + // on the current master and as such it might miss some intermediary cluster states due to batching. + // Clean up listener in that case and acknowledge completion of restore operation to client. + clusterService.removeListener(this); + listener.onResponse(new RestoreSnapshotResponse(null)); + } else if (newEntry == null) { + clusterService.removeListener(this); + ImmutableOpenMap shards = prevEntry.shards(); + assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert RestoreService.completed(shards) : "expected all restore entries to be completed"; + RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), + prevEntry.indices(), + shards.size(), + shards.size() - RestoreService.failedShards(shards)); + RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); + logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); + listener.onResponse(response); + } else { + // restore not completed yet, wait for next cluster state update + } + } + + /** + * Creates a cluster state listener and registers it with the cluster service. The listener passed as a + * parameter will be called when the restore is complete. + */ + public static void createAndRegisterListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response, + ActionListener listener) { + clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 111e183336071..fe2b47e41690e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -22,27 +22,18 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse; -import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; - /** * Transport action for restore snapshot operation */ @@ -87,39 +78,7 @@ protected void masterOperation(final RestoreSnapshotRequest request, final Clust @Override public void onResponse(RestoreCompletionResponse restoreCompletionResponse) { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); - String uuid = restoreCompletionResponse.getUuid(); - - ClusterStateListener clusterStateListener = new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent changedEvent) { - final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); - final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); - if (prevEntry == null) { - // When there is a master failure after a restore has been started, this listener might not be registered - // on the current master and as such it might miss some intermediary cluster states due to batching. - // Clean up listener in that case and acknowledge completion of restore operation to client. - clusterService.removeListener(this); - listener.onResponse(new RestoreSnapshotResponse(null)); - } else if (newEntry == null) { - clusterService.removeListener(this); - ImmutableOpenMap shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); - assert RestoreService.completed(shards) : "expected all restore entries to be completed"; - RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), - prevEntry.indices(), - shards.size(), - shards.size() - RestoreService.failedShards(shards)); - RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); - logger.debug("restore of [{}] completed", snapshot); - listener.onResponse(response); - } else { - // restore not completed yet, wait for next cluster state update - } - } - }; - - clusterService.addListener(clusterStateListener); + RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener); } else { listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index f73f5c6dfb2d3..d5cd8ebd4f1ab 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -37,6 +37,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml index f1e47d830cf97..8383ecd4e6851 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml @@ -33,6 +33,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml index ba45d4a2fb31c..647eeb8c64562 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml @@ -37,6 +37,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml index d1da84932e598..e8f257aa66e2c 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml @@ -37,6 +37,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index f1bc27e817958..1dbfe6063f13b 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -72,7 +72,7 @@ protected static void followIndex(String leaderCluster, String leaderIndex, Stri } protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException { - final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); + final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1"); request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex + "\", \"read_poll_timeout\": \"10ms\"}"); assertOK(client.performRequest(request)); @@ -200,6 +200,7 @@ protected static Map toMap(String response) { protected static void ensureYellow(String index) throws IOException { Request request = new Request("GET", "/_cluster/health/" + index); request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_active_shards", "1"); request.addParameter("wait_for_no_relocating_shards", "true"); request.addParameter("wait_for_no_initializing_shards", "true"); request.addParameter("timeout", "70s"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index c58f5d8fb62b3..884d12afe6650 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Strings; @@ -94,28 +95,32 @@ public boolean isCcrAllowed() { } /** - * Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster. - * Before fetching the index metadata, the remote cluster is checked for license compatibility with CCR. - * If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise, - * the specified consumer is invoked with the leader index metadata fetched from the remote cluster. + * Fetches the remote cluster state, leader index metadata, and history UUIDs for leader index shards from + * the remote cluster. Before fetching the index metadata, the remote cluster is checked for license + * compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer + * is is invoked. Otherwise, the specified consumer is invoked with the cluster state and leader index metadata + * fetched from the remote cluster. * * @param client the client * @param clusterAlias the remote cluster alias + * @param fetchNodes whether this request should fetch the nodes of the remote cluster * @param leaderIndex the name of the leader index * @param onFailure the failure consumer * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards */ - public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( + public void checkRemoteClusterLicenseAndFetchClusterStateLeaderIndexMetadataAndHistoryUUIDs( final Client client, final String clusterAlias, + final boolean fetchNodes, final String leaderIndex, final Consumer onFailure, - final BiConsumer consumer) { + final BiConsumer> consumer) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.metaData(true); request.indices(leaderIndex); + request.nodes(fetchNodes); checkRemoteClusterLicenseAndFetchClusterState( client, clusterAlias, @@ -134,7 +139,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> { if (e == null) { fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, onFailure, historyUUIDs -> - consumer.accept(historyUUIDs, leaderIndexMetaData)); + consumer.accept(historyUUIDs, new Tuple<>(remoteClusterState, leaderIndexMetaData))); } else { onFailure.accept(e); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/Pre67PutFollow.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/Pre67PutFollow.java new file mode 100644 index 0000000000000..a0796624de50d --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/Pre67PutFollow.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ActiveShardsObserver; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; + +import java.util.HashMap; +import java.util.Map; + +final class Pre67PutFollow { + + private static final Logger logger = LogManager.getLogger(Pre67PutFollow.class); + + private final Client client; + private final ClusterService clusterService; + private final AllocationService allocationService; + private final ActiveShardsObserver activeShardsObserver; + + Pre67PutFollow(final Client client, final ClusterService clusterService, final AllocationService allocationService, + final ActiveShardsObserver activeShardsObserver) { + this.client = client; + this.clusterService = clusterService; + this.allocationService = allocationService; + this.activeShardsObserver = activeShardsObserver; + } + + void doPre67PutFollow(final PutFollowAction.Request request, final IndexMetaData leaderIndexMetaData, String[] historyUUIDs, + final ActionListener listener) { + ActionListener handler = ActionListener.wrap( + result -> { + if (result) { + initiateFollowing(request, listener); + } else { + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } + }, + listener::onFailure); + // Can't use create index api here, because then index templates can alter the mappings / settings. + // And index templates could introduce settings / mappings that are incompatible with the leader index. + clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask(request, handler) { + + @Override + protected Boolean newResponse(final boolean acknowledged) { + return acknowledged; + } + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + String followIndex = request.getFollowRequest().getFollowerIndex(); + IndexMetaData currentIndex = currentState.metaData().index(followIndex); + if (currentIndex != null) { + throw new ResourceAlreadyExistsException(currentIndex.getIndex()); + } + + MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); + IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex); + + // Adding the leader index uuid for each shard as custom metadata: + Map metadata = new HashMap<>(); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs)); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID()); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName()); + metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, request.getRemoteCluster()); + imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); + + // Copy all settings, but overwrite a few settings. + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(leaderIndexMetaData.getSettings()); + // Overwriting UUID here, because otherwise we can't follow indices in the same cluster + settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); + settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); + imdBuilder.settings(settingsBuilder); + + // Copy mappings from leader IMD to follow IMD + for (ObjectObjectCursor cursor : leaderIndexMetaData.getMappings()) { + imdBuilder.putMapping(cursor.value); + } + imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards()); + IndexMetaData followIMD = imdBuilder.build(); + mdBuilder.put(followIMD, false); + + ClusterState.Builder builder = ClusterState.builder(currentState); + builder.metaData(mdBuilder.build()); + ClusterState updatedState = builder.build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) + .addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex())); + updatedState = allocationService.reroute( + ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(), + "follow index [" + request.getFollowRequest().getFollowerIndex() + "] created"); + + logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]", + followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas()); + + return updatedState; + } + }); + } + + private void initiateFollowing(final PutFollowAction.Request request, final ActionListener listener) { + activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()}, + ActiveShardCount.DEFAULT, request.timeout(), result -> { + if (result) { + client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( + r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), + listener::onFailure + )); + } else { + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } + }, listener::onFailure); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index eaf60a0fb4fa6..515da70719fd7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -6,51 +6,56 @@ package org.elasticsearch.xpack.ccr.action; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.ResourceAlreadyExistsException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; public final class TransportPutFollowAction - extends TransportMasterNodeAction { + extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportPutFollowAction.class); private final Client client; - private final AllocationService allocationService; - private final ActiveShardsObserver activeShardsObserver; + private final RestoreService restoreService; private final CcrLicenseChecker ccrLicenseChecker; + private final ActiveShardsObserver activeShardsObserver; + private final Pre67PutFollow pre67PutFollow; @Inject public TransportPutFollowAction( @@ -61,6 +66,7 @@ public TransportPutFollowAction( final ActionFilters actionFilters, final IndexNameExpressionResolver indexNameExpressionResolver, final Client client, + final RestoreService restoreService, final AllocationService allocationService, final CcrLicenseChecker ccrLicenseChecker) { super( @@ -73,9 +79,10 @@ public TransportPutFollowAction( PutFollowAction.Request::new, indexNameExpressionResolver); this.client = client; - this.allocationService = allocationService; - this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + this.restoreService = restoreService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); + this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + this.pre67PutFollow = new Pre67PutFollow(client, clusterService, allocationService, activeShardsObserver); } @Override @@ -97,7 +104,7 @@ protected PutFollowAction.Response read(StreamInput in) throws IOException { protected void masterOperation( final PutFollowAction.Request request, final ClusterState state, - final ActionListener listener) throws Exception { + final ActionListener listener) { if (ccrLicenseChecker.isCcrAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; @@ -107,19 +114,23 @@ protected void masterOperation( client.getRemoteClusterClient(remoteCluster); String leaderIndex = request.getLeaderIndex(); - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( + Version minNodeVersion = state.getNodes().getMinNodeVersion(); + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterStateLeaderIndexMetadataAndHistoryUUIDs( client, remoteCluster, + true, leaderIndex, listener::onFailure, - (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); + (historyUUID, metaDataTuple) -> createFollowerIndex(metaDataTuple, historyUUID, request, listener, minNodeVersion)); } private void createFollowerIndex( - final IndexMetaData leaderIndexMetaData, - final String[] historyUUIDs, + final Tuple metaDataTuple, + final String [] historyUUID, final PutFollowAction.Request request, - final ActionListener listener) { + final ActionListener listener, + final Version localClusterMinNodeVersion) { + IndexMetaData leaderIndexMetaData = metaDataTuple.v2(); if (leaderIndexMetaData == null) { listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist")); return; @@ -132,98 +143,117 @@ private void createFollowerIndex( return; } - ActionListener handler = ActionListener.wrap( - result -> { - if (result) { - initiateFollowing(request, listener); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, - listener::onFailure); - // Can't use create index api here, because then index templates can alter the mappings / settings. - // And index templates could introduce settings / mappings that are incompatible with the leader index. - clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask(request, handler) { + boolean pre67CompatibilityMode = localClusterMinNodeVersion.before(Version.V_6_7_0) + || metaDataTuple.v1().getNodes().getMinNodeVersion().before(Version.V_6_7_0); - @Override - protected Boolean newResponse(final boolean acknowledged) { - return acknowledged; - } + if (pre67CompatibilityMode) { + logger.debug("Pre-6.7 nodes present in local/remote cluster. Cannot bootstrap from remote. Creating empty follower index " + + "[{}] and initiating following [{}, {}].", request.getFollowRequest().getFollowerIndex(), request.getRemoteCluster(), + request.getLeaderIndex()); + pre67PutFollow.doPre67PutFollow(request, leaderIndexMetaData, historyUUID, listener); + } else { + final Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex()) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster(); + final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) + .indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$") + .renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout()) + .indexSettings(settingsBuilder); - @Override - public ClusterState execute(final ClusterState currentState) throws Exception { - String followIndex = request.getFollowRequest().getFollowerIndex(); - IndexMetaData currentIndex = currentState.metaData().index(followIndex); - if (currentIndex != null) { - throw new ResourceAlreadyExistsException(currentIndex.getIndex()); + final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders()); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + restoreService.restoreSnapshot(restoreRequest, new ActionListener() { + + @Override + public void onResponse(RestoreService.RestoreCompletionResponse response) { + afterRestoreStarted(clientWithHeaders, request, listener, response); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } + }); + } + } + + private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Request request, + ActionListener originalListener, + RestoreService.RestoreCompletionResponse response) { + final ActionListener listener; + if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) { + originalListener.onResponse(new PutFollowAction.Response(true, false, false)); + listener = new ActionListener() { - MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex); - - // Adding the leader index uuid for each shard as custom metadata: - Map metadata = new HashMap<>(); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs)); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID()); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName()); - metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, request.getRemoteCluster()); - imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); - - // Copy all settings, but overwrite a few settings. - Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(leaderIndexMetaData.getSettings()); - // Overwriting UUID here, because otherwise we can't follow indices in the same cluster - settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); - settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); - settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); - imdBuilder.settings(settingsBuilder); - - // Copy mappings from leader IMD to follow IMD - for (ObjectObjectCursor cursor : leaderIndexMetaData.getMappings()) { - imdBuilder.putMapping(cursor.value); + @Override + public void onResponse(PutFollowAction.Response response) { + logger.debug("put follow {} completed with {}", request, response); } - imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards()); - IndexMetaData followIMD = imdBuilder.build(); - mdBuilder.put(followIMD, false); - ClusterState.Builder builder = ClusterState.builder(currentState); - builder.metaData(mdBuilder.build()); - ClusterState updatedState = builder.build(); + @Override + public void onFailure(Exception e) { + logger.debug(() -> new ParameterizedMessage("put follow {} failed during the restore process", request), e); + } + }; + } else { + listener = originalListener; + } - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) - .addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex())); - updatedState = allocationService.reroute( - ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(), - "follow index [" + request.getFollowRequest().getFollowerIndex() + "] created"); + RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener() { + @Override + public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) { + RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); - logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]", - followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas()); + if (restoreInfo == null) { + // If restoreInfo is null then it is possible there was a master failure during the + // restore. + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } else if (restoreInfo.failedShards() == 0) { + initiateFollowing(clientWithHeaders, request, listener); + } else { + assert restoreInfo.failedShards() > 0 : "Should have failed shards"; + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } + } - return updatedState; + @Override + public void onFailure(Exception e) { + listener.onFailure(e); } }); } private void initiateFollowing( - final PutFollowAction.Request request, - final ActionListener listener) { + final Client client, + final PutFollowAction.Request request, + final ActionListener listener) { + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()}, - ActiveShardCount.DEFAULT, request.timeout(), result -> { - if (result) { - client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( - r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), - listener::onFailure - )); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, listener::onFailure); + request.waitForActiveShards(), request.timeout(), result -> { + if (result) { + client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( + r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), + listener::onFailure + )); + } else { + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } + }, listener::onFailure); } @Override protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) { return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex()); } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 165e8fb2345ce..67d5d7ffd74d3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -133,14 +133,15 @@ protected void masterOperation(final ResumeFollowAction.Request request, // Validates whether the leader cluster has been configured properly: client.getRemoteClusterClient(leaderCluster); final String leaderIndex = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterStateLeaderIndexMetadataAndHistoryUUIDs( client, leaderCluster, + false, leaderIndex, listener::onFailure, - (leaderHistoryUUID, leaderIndexMetadata) -> { + (leaderHistoryUUID, metaDataTuple) -> { try { - start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); + start(request, leaderCluster, metaDataTuple.v2(), followerIndexMetadata, leaderHistoryUUID, listener); } catch (final IOException e) { listener.onFailure(e); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 539c19ca8501f..a1b601f352583 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.ccr.repository; +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; @@ -81,6 +83,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public static final String TYPE = "_ccr_"; public static final String NAME_PREFIX = "_ccr_"; private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST); + private static final String IN_SYNC_ALLOCATION_ID = "ccr_restore"; private final RepositoryMetaData metadata; private final CcrSettings ccrSettings; @@ -160,7 +163,7 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); String[] leaderHistoryUUIDs = future.actionGet(ccrSettings.getRecoveryActionTimeout()); - IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData); + IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndex); // Adding the leader index uuid for each shard as custom metadata: Map metadata = new HashMap<>(); metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", leaderHistoryUUIDs)); @@ -169,6 +172,19 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, remoteClusterAlias); imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); + imdBuilder.settings(leaderIndexMetaData.getSettings()); + + // Copy mappings from leader IMD to follow IMD + for (ObjectObjectCursor cursor : leaderIndexMetaData.getMappings()) { + imdBuilder.putMapping(cursor.value); + } + + imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards()); + // We assert that insync allocation ids are not empty in `PrimaryShardAllocator` + for (IntObjectCursor> entry : leaderIndexMetaData.getInSyncAllocationIds()) { + imdBuilder.putInSyncAllocationIds(entry.key, Collections.singleton(IN_SYNC_ALLOCATION_ID)); + } + return imdBuilder.build(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java index 7b21422cb9867..d7a2edd21d26f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.rest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; @@ -38,7 +39,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient static Request createRequest(RestRequest restRequest) throws IOException { try (XContentParser parser = restRequest.contentOrSourceParamParser()) { - return Request.fromXContent(parser, restRequest.param("index")); + ActiveShardCount waitForActiveShards = ActiveShardCount.parseString(restRequest.param("wait_for_active_shards")); + return Request.fromXContent(parser, restRequest.param("index"), waitForActiveShards); } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 771ee2e0338d2..40e9bcb1499e4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; @@ -283,8 +284,13 @@ protected final ClusterHealthStatus ensureLeaderGreen(String... indices) { } protected final ClusterHealthStatus ensureFollowerGreen(String... indices) { + return ensureFollowerGreen(false, indices); + } + + protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) { logger.info("ensure green follower indices {}", Arrays.toString(indices)); - return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices); + return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), + waitForNoInitializingShards, indices); } private ClusterHealthStatus ensureColor(TestCluster testCluster, @@ -417,10 +423,15 @@ protected String getIndexSettings(final int numberOfShards, final int numberOfRe } public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { + return putFollow(leaderIndex, followerIndex, ActiveShardCount.ONE); + } + + public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) { PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster("leader_cluster"); request.setLeaderIndex(leaderIndex); request.setFollowRequest(resumeFollow(followerIndex)); + request.waitForActiveShards(waitForActiveShards); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index ad8f545fa9dc0..48531c7d28f9a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -98,6 +99,7 @@ protected PutFollowAction.Request getPutFollowRequest(String leaderIndex, String request.setRemoteCluster("local"); request.setLeaderIndex(leaderIndex); request.setFollowRequest(getResumeFollowRequest(followerIndex)); + request.waitForActiveShards(ActiveShardCount.ONE); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 78d8af1592c8c..a1da1e240d70e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -30,8 +31,10 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -92,14 +95,12 @@ public class IndexFollowingIT extends CcrIntegTestCase { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + int numberOfReplicas = between(0, 1); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); - final PutFollowAction.Request followRequest = putFollow("index1", "index2"); - followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - final int firstBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); for (int i = 0; i < firstBatchNumDocs; i++) { @@ -107,6 +108,30 @@ public void testFollowIndex() throws Exception { leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } + boolean waitOnAll = randomBoolean(); + + final PutFollowAction.Request followRequest; + if (waitOnAll) { + followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); + } else { + followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); + } + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); + ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2"); + for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + if (waitOnAll) { + assertTrue(shardHealth.isPrimaryActive()); + assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + } else { + assertTrue(shardHealth.isPrimaryActive()); + } + } + final Map firstBatchNumDocsPerShard = new HashMap<>(); final ShardStats[] firstBatchShardStats = leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); @@ -153,6 +178,119 @@ public void testFollowIndex() throws Exception { assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } + public void testFollowIndexWithConcurrentMappingChanges() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + AtomicBoolean isRunning = new AtomicBoolean(true); + + // Concurrently index new docs with mapping changes + Thread thread = new Thread(() -> { + int docID = 10000; + char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); + for (char c : chars) { + if (isRunning.get() == false) { + break; + } + final String source; + long valueToPutInDoc = randomLongBetween(0, 50000); + if (randomBoolean()) { + source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc); + } else { + source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc); + } + for (int i = 1; i < 10; i++) { + if (isRunning.get() == false) { + break; + } + leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); + } + } + leaderClient().admin().indices().prepareFlush("index1").setForce(true).setWaitIfOngoing(true).get(); + } + }); + thread.start(); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen("index2"); + + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + final int secondBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + isRunning.set(false); + thread.join(); + } + + public void testFollowIndexWithoutWaitForComplete() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + assertTrue(response.isFollowIndexCreated()); + assertFalse(response.isFollowIndexShardsAcked()); + assertFalse(response.isIndexFollowingStarted()); + + // Check that the index exists, would throw index not found exception if the index is missing + followerClient().admin().indices().prepareGetIndex().addIndices("index2").get(); + ensureFollowerGreen(true, "index2"); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs); + pauseFollow("index2"); + } + public void testSyncMappings() throws Exception { final String leaderIndexSettings = getIndexSettings(2, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index c508ab0ca7e0a..9d8e0146bfc51 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -39,14 +39,14 @@ public void testFollowIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("leader"); - final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); - client().execute(PutFollowAction.INSTANCE, followRequest).get(); - final long firstBatchNumDocs = randomIntBetween(2, 64); for (int i = 0; i < firstBatchNumDocs; i++) { client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); } + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertBusy(() -> { assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs)); }); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java index 726a1c9893a50..d32a773ebe218 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -30,7 +31,7 @@ protected PutFollowAction.Request createTestInstance() { @Override protected PutFollowAction.Request doParseInstance(XContentParser parser) throws IOException { - return PutFollowAction.Request.fromXContent(parser, null); + return PutFollowAction.Request.fromXContent(parser, null, ActiveShardCount.DEFAULT); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java index 856f3e29f840d..b5c76debac148 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PutFollowAction.java @@ -6,11 +6,13 @@ package org.elasticsearch.xpack.core.ccr.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.client.ElasticsearchClient; @@ -30,10 +32,10 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.FOLLOWER_INDEX_FIELD; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT; -import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT; import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; @@ -110,7 +112,8 @@ public static class Request extends AcknowledgedRequest implements Indi ObjectParser.ValueType.STRING); } - public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException { + public static Request fromXContent(final XContentParser parser, final String followerIndex, ActiveShardCount waitForActiveShards) + throws IOException { Request request = PARSER.parse(parser, followerIndex); if (followerIndex != null) { if (request.getFollowRequest().getFollowerIndex() == null) { @@ -121,11 +124,13 @@ public static Request fromXContent(final XContentParser parser, final String fol } } } + request.waitForActiveShards(waitForActiveShards); return request; } private String remoteCluster; private String leaderIndex; + private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; private ResumeFollowAction.Request followRequest; public Request() { @@ -147,6 +152,27 @@ public void setLeaderIndex(String leaderIndex) { this.leaderIndex = leaderIndex; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + /** + * Sets the number of shard copies that should be active for follower index creation to + * return. Defaults to {@link ActiveShardCount#NONE}, which will not wait for any shards + * to be active. Set this value to {@link ActiveShardCount#DEFAULT} to wait for the primary + * shard to be active. Set this value to {@link ActiveShardCount#ALL} to wait for all shards + * (primary and all replicas) to be active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void waitForActiveShards(ActiveShardCount waitForActiveShards) { + if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) { + this.waitForActiveShards = ActiveShardCount.NONE; + } else { + this.waitForActiveShards = waitForActiveShards; + } + } + public ResumeFollowAction.Request getFollowRequest() { return followRequest; } @@ -181,6 +207,9 @@ public Request(StreamInput in) throws IOException { super(in); remoteCluster = in.readString(); leaderIndex = in.readString(); + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + waitForActiveShards(ActiveShardCount.readFrom(in)); + } followRequest = new ResumeFollowAction.Request(in); } @@ -189,6 +218,9 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(remoteCluster); out.writeString(leaderIndex); + if (out.getVersion().onOrAfter(Version.V_6_7_0)) { + waitForActiveShards.writeTo(out); + } followRequest.writeTo(out); } @@ -211,12 +243,23 @@ public boolean equals(Object o) { Request request = (Request) o; return Objects.equals(remoteCluster, request.remoteCluster) && Objects.equals(leaderIndex, request.leaderIndex) && + Objects.equals(waitForActiveShards, request.waitForActiveShards) && Objects.equals(followRequest, request.followRequest); } @Override public int hashCode() { - return Objects.hash(remoteCluster, leaderIndex, followRequest); + return Objects.hash(remoteCluster, leaderIndex, waitForActiveShards, followRequest); + } + + @Override + public String toString() { + return "PutFollowAction.Request{" + + "remoteCluster='" + remoteCluster + '\'' + + ", leaderIndex='" + leaderIndex + '\'' + + ", waitForActiveShards=" + waitForActiveShards + + ", followRequest=" + followRequest + + '}'; } } @@ -285,6 +328,15 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(followIndexCreated, followIndexShardsAcked, indexFollowingStarted); } + + @Override + public String toString() { + return "PutFollowAction.Response{" + + "followIndexCreated=" + followIndexCreated + + ", followIndexShardsAcked=" + followIndexShardsAcked + + ", indexFollowingStarted=" + indexFollowingStarted + + '}'; + } } public static class RequestBuilder extends ActionRequestBuilder { diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index 65baeb5f168c4..f8ffce9cd817a 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -61,6 +61,7 @@ public void testBasicCCRAndILMIntegration() throws Exception { // Policy with the same name must exist in follower cluster too: putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24)); followIndex(indexName, indexName); + ensureGreen(indexName); // Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster: client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs")); @@ -116,6 +117,7 @@ public void testCCRUnfollowDuringSnapshot() throws Exception { } else if ("follow".equals(targetCluster)) { createNewSingletonPolicy("unfollow-only", "hot", new UnfollowAction(), TimeValue.ZERO); followIndex(indexName, indexName); + ensureGreen(indexName); // Create the repository before taking the snapshot. Request request = new Request("PUT", "/_snapshot/repo"); @@ -210,7 +212,7 @@ public void testCcrAndIlmWithRollover() throws Exception { "\"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}, " + "\"aliases\": {\"" + alias + "\": {\"is_write_index\": true}} }"); assertOK(leaderClient.performRequest(createIndexRequest)); - // Check that the new index is creeg + // Check that the new index is created Request checkIndexRequest = new Request("GET", "/_cluster/health/" + indexName); checkIndexRequest.addParameter("wait_for_status", "green"); checkIndexRequest.addParameter("timeout", "70s"); @@ -226,6 +228,7 @@ public void testCcrAndIlmWithRollover() throws Exception { index(leaderClient, indexName, "1"); assertDocumentExists(leaderClient, indexName, "1"); + ensureGreen(indexName); assertBusy(() -> { assertDocumentExists(client(), indexName, "1"); // Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json index 635a4e62683bb..588dd60261252 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json @@ -11,6 +11,13 @@ "required": true, "description": "The name of the follower index" } + }, + "params": { + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before returning. Defaults to 0. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)", + "default": "0" + } } }, "body": {