Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CcrRepository to init follower index #35719

Merged
merged 108 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 100 commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
22589ef
WIP
Tim-Brooks Nov 13, 2018
37aa475
WIP
Tim-Brooks Nov 13, 2018
4da9aa8
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 13, 2018
ad65ffa
WIP
Tim-Brooks Nov 13, 2018
af6fbd2
Fix indention
Tim-Brooks Nov 13, 2018
63cc5cf
WIP
Tim-Brooks Nov 14, 2018
ddbe0ce
WIP
Tim-Brooks Nov 14, 2018
dfe7561
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 14, 2018
c235a02
WIP
Tim-Brooks Nov 15, 2018
bdb0a57
Work on implementing private restore API
Tim-Brooks Nov 15, 2018
d6c0c39
WIP
Tim-Brooks Nov 16, 2018
9b09b56
WIP
Tim-Brooks Nov 16, 2018
45d0658
WIP
Tim-Brooks Nov 16, 2018
465a0d6
WIP
Tim-Brooks Nov 17, 2018
105fb0f
WIP
Tim-Brooks Nov 19, 2018
01909a5
Get basic tests
Tim-Brooks Nov 19, 2018
5f3ea03
Automatically creat repo
Tim-Brooks Nov 19, 2018
7096eb1
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 19, 2018
d93c3d7
WIP
Tim-Brooks Nov 19, 2018
c6cd205
Cleanups
Tim-Brooks Nov 19, 2018
bb548f2
checkstyle
Tim-Brooks Nov 20, 2018
1f9dcc8
Teardown repo
Tim-Brooks Nov 20, 2018
99cf6dc
Add comment
Tim-Brooks Nov 20, 2018
4c9162c
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 20, 2018
025edf1
Fix license
Tim-Brooks Nov 20, 2018
a3b2fe4
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 20, 2018
5b52303
Fix test
Tim-Brooks Nov 20, 2018
f690c9b
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 20, 2018
b075faa
Cleanups
Tim-Brooks Nov 20, 2018
12b6ed5
Cleanups
Tim-Brooks Nov 20, 2018
fa7016d
Fix
Tim-Brooks Nov 20, 2018
74ea309
Remove extra setting
Tim-Brooks Nov 21, 2018
66a749e
WIP
Tim-Brooks Nov 21, 2018
dd99b08
WIP
Tim-Brooks Nov 21, 2018
4c1928a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 30, 2018
1734ad8
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 4, 2018
ebf8609
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 4, 2018
9073151
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 4, 2018
d43dc41
Update work
Tim-Brooks Dec 4, 2018
d34efc3
Fix some tests
Tim-Brooks Dec 5, 2018
4bf5909
Fix checkstyle
Tim-Brooks Dec 5, 2018
2b4ab70
Mute test
Tim-Brooks Dec 5, 2018
f52defd
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 15, 2018
40bd59f
Catch
Tim-Brooks Dec 15, 2018
8508beb
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 17, 2018
6e3ff69
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 18, 2018
53a2305
Cleanup unused
Tim-Brooks Dec 18, 2018
a89902a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 18, 2018
f99d64a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 15, 2019
d56a382
WIP
Tim-Brooks Jan 15, 2019
3f3afcb
WIP
Tim-Brooks Jan 15, 2019
aee4c42
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 15, 2019
1970f27
Fix tests
Tim-Brooks Jan 15, 2019
2ff72b8
Revert
Tim-Brooks Jan 16, 2019
3b5893b
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 16, 2019
93a0f6a
Fix
Tim-Brooks Jan 16, 2019
221fd22
Wait on active shards
Tim-Brooks Jan 16, 2019
3725471
Changes
Tim-Brooks Jan 16, 2019
bdf7533
WIP
Tim-Brooks Jan 16, 2019
c25daf5
Fix checkstyle
Tim-Brooks Jan 16, 2019
db2218f
Fix test
Tim-Brooks Jan 17, 2019
413ccc7
Fix tests
Tim-Brooks Jan 17, 2019
f3e9792
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 17, 2019
267088b
Changes
Tim-Brooks Jan 17, 2019
68d7401
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 18, 2019
a98ab13
Changes
Tim-Brooks Jan 18, 2019
2032be5
Changes
Tim-Brooks Jan 18, 2019
f10b43b
Changes
Tim-Brooks Jan 18, 2019
9c37120
Fix whitespace
Tim-Brooks Jan 18, 2019
d128082
Changes
Tim-Brooks Jan 18, 2019
97337f8
Changes
Tim-Brooks Jan 18, 2019
e1a58eb
Fix
Tim-Brooks Jan 18, 2019
e157f52
Fix tests
Tim-Brooks Jan 18, 2019
c9ee981
Fix
Tim-Brooks Jan 18, 2019
107ddfd
Change rest api
Tim-Brooks Jan 19, 2019
95bbc84
Fix
Tim-Brooks Jan 19, 2019
d8439ff
Improve test
Tim-Brooks Jan 19, 2019
a568d3d
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 24, 2019
80e0548
Changes
Tim-Brooks Jan 24, 2019
d5ec701
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 24, 2019
cc14be5
Change
Tim-Brooks Jan 24, 2019
aab2aa1
Fix test
Tim-Brooks Jan 24, 2019
a291142
Fix test
Tim-Brooks Jan 24, 2019
d670028
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 24, 2019
d25401e
Fix test
Tim-Brooks Jan 24, 2019
ebf812d
Changes
Tim-Brooks Jan 25, 2019
f4e0ca8
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
cc64413
Changes
Tim-Brooks Jan 25, 2019
acf3ebe
Changes
Tim-Brooks Jan 25, 2019
bb6402a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
c5c2d40
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
065ab72
Do not pull all metadata
Tim-Brooks Jan 25, 2019
d91cf24
Listener changes
Tim-Brooks Jan 25, 2019
6232914
WIP
Tim-Brooks Jan 25, 2019
9da1fb4
Changes
Tim-Brooks Jan 25, 2019
eb819d2
Changes
Tim-Brooks Jan 25, 2019
b8d9215
Changes
Tim-Brooks Jan 25, 2019
a0b89e0
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
5fdcd6a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 26, 2019
297280e
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 26, 2019
59f3d98
Change
Tim-Brooks Jan 28, 2019
e4e87f2
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 28, 2019
1a2177f
Changes
Tim-Brooks Jan 28, 2019
0b526e4
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 28, 2019
a630a4d
Fix test
Tim-Brooks Jan 28, 2019
5385fb2
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 28, 2019
fceaf22
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 29, 2019
9519a2e
Changes
Tim-Brooks Jan 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ static Request putFollow(PutFollowRequest putFollowRequest) throws IOException {
.addPathPartAsIs("_ccr", "follow")
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params parameters = new RequestConverters.Params(request);
parameters.withWaitForActiveShards(putFollowRequest.waitForActiveShards());
request.setEntity(createEntity(putFollowRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.client.ccr;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand All @@ -36,11 +37,17 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
private final String remoteCluster;
private final String leaderIndex;
private final String followerIndex;
private final ActiveShardCount waitForActiveShards;

public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) {
this(remoteCluster, leaderIndex, followerIndex, ActiveShardCount.NONE);
}

public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) {
this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster");
this.leaderIndex = Objects.requireNonNull(leaderIndex, "leaderIndex");
this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex");
this.waitForActiveShards = waitForActiveShards;
}

@Override
Expand All @@ -66,13 +73,18 @@ public String getFollowerIndex() {
return followerIndex;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
PutFollowRequest that = (PutFollowRequest) o;
return Objects.equals(remoteCluster, that.remoteCluster) &&
return Objects.equals(waitForActiveShards, that.waitForActiveShards) &&
Objects.equals(remoteCluster, that.remoteCluster) &&
Objects.equals(leaderIndex, that.leaderIndex) &&
Objects.equals(followerIndex, that.followerIndex);
}
Expand All @@ -83,7 +95,7 @@ public int hashCode() {
super.hashCode(),
remoteCluster,
leaderIndex,
followerIndex
);
followerIndex,
waitForActiveShards);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.CcrStatsResponse;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void testIndexFollowing() throws Exception {
CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));

PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower");
PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -97,7 +98,8 @@ public void testPutFollow() throws Exception {
PutFollowRequest putFollowRequest = new PutFollowRequest(
"local", // <1>
"leader", // <2>
"follower" // <3>
"follower", // <3>
ActiveShardCount.ONE // <4>
);
// end::ccr-put-follow-request

Expand Down Expand Up @@ -175,7 +177,7 @@ public void testPauseFollow() throws Exception {
String followIndex = "follower";
// Follow index, so that it can be paused:
{
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down Expand Up @@ -241,7 +243,7 @@ public void testResumeFollow() throws Exception {
String followIndex = "follower";
// Follow index, so that it can be paused:
{
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down Expand Up @@ -317,7 +319,7 @@ public void testUnfollow() throws Exception {
String followIndex = "follower";
// Follow index, pause and close, so that it can be unfollowed:
{
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down Expand Up @@ -349,7 +351,7 @@ public void testUnfollow() throws Exception {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(followIndex);
assertThat(client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT).isAcknowledged(), is(true));

PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down Expand Up @@ -639,7 +641,7 @@ public void testGetFollowStats() throws Exception {
}
{
// Follow index, so that we can query for follow stats:
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower", ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down
2 changes: 2 additions & 0 deletions docs/java-rest/high-level/ccr/put_follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ include-tagged::{doc-tests-file}[{api}-request]
<1> The name of the remote cluster alias.
<2> The name of the leader in the remote cluster.
<3> The name of the follower index that gets created as part of the put follow API call.
<4> The number of active shard copies to wait for before the put follow API returns a
response, as an `ActiveShardCount`

[id="{upid}-{api}-response"]
==== Response
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/apis/follow/get-follow-info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ replication options and whether the follower indices are active or paused.

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/apis/follow/get-follow-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ following tasks associated with each shard for the specified indices.

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/apis/follow/post-pause-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ following task.

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/apis/follow/post-resume-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ returns, the follower index will resume fetching operations from the leader inde

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/apis/follow/post-unfollow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ irreversible operation.

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
Expand Down
9 changes: 7 additions & 2 deletions docs/reference/ccr/apis/follow/put-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ POST /follower_index/_ccr/pause_follow

[source,js]
--------------------------------------------------
PUT /<follower_index>/_ccr/follow
PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "<remote_cluster>",
"leader_index" : "<leader_index>"
Expand All @@ -43,6 +43,11 @@ PUT /<follower_index>/_ccr/follow
// TEST[s/<remote_cluster>/remote_cluster/]
// TEST[s/<leader_index>/leader_index/]

The `wait_for_active_shards` parameter specifies the number of shards to wait on being active
before responding. This defaults to waiting on none of the shards to be active. A shard must
be restored from the leader index being active. Restoring a follower shard requires transferring
all the remote lucene segment files to the follower index.
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved

==== Path Parameters

`follower_index` (required)::
Expand Down Expand Up @@ -73,7 +78,7 @@ This example creates a follower index named `follower_index`:

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index",
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ shard-level stats as in the <<ccr-get-follow-stats,get follower stats API>>.

[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ccr/getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ cluster.

[source,js]
--------------------------------------------------
PUT /server-metrics-copy/_ccr/follow
PUT /server-metrics-copy/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "leader",
"leader_index" : "server-metrics"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.snapshots.restore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

public class RestoreClusterStateListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class);

private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;


private RestoreClusterStateListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps do the the addListener call as well here so that it's more standalone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aded a static method that creates the class and registers it. I'm not wild about creating classes just for the side-effects in the ctor.

}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse(null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}

/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called when the restore is complete.
*/
public static void createAndRegisterListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
}
}
Loading