Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CcrRepository to init follower index #35719

Merged
merged 108 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
22589ef
WIP
Tim-Brooks Nov 13, 2018
37aa475
WIP
Tim-Brooks Nov 13, 2018
4da9aa8
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 13, 2018
ad65ffa
WIP
Tim-Brooks Nov 13, 2018
af6fbd2
Fix indention
Tim-Brooks Nov 13, 2018
63cc5cf
WIP
Tim-Brooks Nov 14, 2018
ddbe0ce
WIP
Tim-Brooks Nov 14, 2018
dfe7561
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 14, 2018
c235a02
WIP
Tim-Brooks Nov 15, 2018
bdb0a57
Work on implementing private restore API
Tim-Brooks Nov 15, 2018
d6c0c39
WIP
Tim-Brooks Nov 16, 2018
9b09b56
WIP
Tim-Brooks Nov 16, 2018
45d0658
WIP
Tim-Brooks Nov 16, 2018
465a0d6
WIP
Tim-Brooks Nov 17, 2018
105fb0f
WIP
Tim-Brooks Nov 19, 2018
01909a5
Get basic tests
Tim-Brooks Nov 19, 2018
5f3ea03
Automatically creat repo
Tim-Brooks Nov 19, 2018
7096eb1
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 19, 2018
d93c3d7
WIP
Tim-Brooks Nov 19, 2018
c6cd205
Cleanups
Tim-Brooks Nov 19, 2018
bb548f2
checkstyle
Tim-Brooks Nov 20, 2018
1f9dcc8
Teardown repo
Tim-Brooks Nov 20, 2018
99cf6dc
Add comment
Tim-Brooks Nov 20, 2018
4c9162c
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 20, 2018
025edf1
Fix license
Tim-Brooks Nov 20, 2018
a3b2fe4
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 20, 2018
5b52303
Fix test
Tim-Brooks Nov 20, 2018
f690c9b
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 20, 2018
b075faa
Cleanups
Tim-Brooks Nov 20, 2018
12b6ed5
Cleanups
Tim-Brooks Nov 20, 2018
fa7016d
Fix
Tim-Brooks Nov 20, 2018
74ea309
Remove extra setting
Tim-Brooks Nov 21, 2018
66a749e
WIP
Tim-Brooks Nov 21, 2018
dd99b08
WIP
Tim-Brooks Nov 21, 2018
4c1928a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Nov 30, 2018
1734ad8
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 4, 2018
ebf8609
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 4, 2018
9073151
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 4, 2018
d43dc41
Update work
Tim-Brooks Dec 4, 2018
d34efc3
Fix some tests
Tim-Brooks Dec 5, 2018
4bf5909
Fix checkstyle
Tim-Brooks Dec 5, 2018
2b4ab70
Mute test
Tim-Brooks Dec 5, 2018
f52defd
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 15, 2018
40bd59f
Catch
Tim-Brooks Dec 15, 2018
8508beb
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 17, 2018
6e3ff69
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 18, 2018
53a2305
Cleanup unused
Tim-Brooks Dec 18, 2018
a89902a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Dec 18, 2018
f99d64a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 15, 2019
d56a382
WIP
Tim-Brooks Jan 15, 2019
3f3afcb
WIP
Tim-Brooks Jan 15, 2019
aee4c42
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 15, 2019
1970f27
Fix tests
Tim-Brooks Jan 15, 2019
2ff72b8
Revert
Tim-Brooks Jan 16, 2019
3b5893b
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 16, 2019
93a0f6a
Fix
Tim-Brooks Jan 16, 2019
221fd22
Wait on active shards
Tim-Brooks Jan 16, 2019
3725471
Changes
Tim-Brooks Jan 16, 2019
bdf7533
WIP
Tim-Brooks Jan 16, 2019
c25daf5
Fix checkstyle
Tim-Brooks Jan 16, 2019
db2218f
Fix test
Tim-Brooks Jan 17, 2019
413ccc7
Fix tests
Tim-Brooks Jan 17, 2019
f3e9792
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 17, 2019
267088b
Changes
Tim-Brooks Jan 17, 2019
68d7401
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 18, 2019
a98ab13
Changes
Tim-Brooks Jan 18, 2019
2032be5
Changes
Tim-Brooks Jan 18, 2019
f10b43b
Changes
Tim-Brooks Jan 18, 2019
9c37120
Fix whitespace
Tim-Brooks Jan 18, 2019
d128082
Changes
Tim-Brooks Jan 18, 2019
97337f8
Changes
Tim-Brooks Jan 18, 2019
e1a58eb
Fix
Tim-Brooks Jan 18, 2019
e157f52
Fix tests
Tim-Brooks Jan 18, 2019
c9ee981
Fix
Tim-Brooks Jan 18, 2019
107ddfd
Change rest api
Tim-Brooks Jan 19, 2019
95bbc84
Fix
Tim-Brooks Jan 19, 2019
d8439ff
Improve test
Tim-Brooks Jan 19, 2019
a568d3d
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 24, 2019
80e0548
Changes
Tim-Brooks Jan 24, 2019
d5ec701
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 24, 2019
cc14be5
Change
Tim-Brooks Jan 24, 2019
aab2aa1
Fix test
Tim-Brooks Jan 24, 2019
a291142
Fix test
Tim-Brooks Jan 24, 2019
d670028
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 24, 2019
d25401e
Fix test
Tim-Brooks Jan 24, 2019
ebf812d
Changes
Tim-Brooks Jan 25, 2019
f4e0ca8
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
cc64413
Changes
Tim-Brooks Jan 25, 2019
acf3ebe
Changes
Tim-Brooks Jan 25, 2019
bb6402a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
c5c2d40
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
065ab72
Do not pull all metadata
Tim-Brooks Jan 25, 2019
d91cf24
Listener changes
Tim-Brooks Jan 25, 2019
6232914
WIP
Tim-Brooks Jan 25, 2019
9da1fb4
Changes
Tim-Brooks Jan 25, 2019
eb819d2
Changes
Tim-Brooks Jan 25, 2019
b8d9215
Changes
Tim-Brooks Jan 25, 2019
a0b89e0
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 25, 2019
5fdcd6a
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 26, 2019
297280e
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 26, 2019
59f3d98
Change
Tim-Brooks Jan 28, 2019
e4e87f2
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 28, 2019
1a2177f
Changes
Tim-Brooks Jan 28, 2019
0b526e4
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 28, 2019
a630a4d
Fix test
Tim-Brooks Jan 28, 2019
5385fb2
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 28, 2019
fceaf22
Merge remote-tracking branch 'upstream/master' into ccr_recover_from_…
Tim-Brooks Jan 29, 2019
9519a2e
Changes
Tim-Brooks Jan 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.snapshots.restore;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.Snapshot;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

public class RestoreClusterStateListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class);

private final ClusterService clusterService;
private final String uuid;
private final Snapshot snapshot;
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
private final ActionListener<RestoreSnapshotResponse> listener;


private RestoreClusterStateListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.snapshot = response.getSnapshot();
this.listener = listener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps do the the addListener call as well here so that it's more standalone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I aded a static method that creates the class and registers it. I'm not wild about creating classes just for the side-effects in the ctor.

}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse(null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", snapshot);
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}

/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called with the restore is complete.
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
*/
public static void createAndRegisterListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,17 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

/**
* Transport action for restore snapshot operation
*/
Expand Down Expand Up @@ -91,39 +82,7 @@ protected void masterOperation(final RestoreSnapshotRequest request, final Clust
@Override
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
String uuid = restoreCompletionResponse.getUuid();

ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse(null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", snapshot);
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}
};

clusterService.addListener(clusterStateListener);
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
ccr.follow:
index: bar
body:
wait_for_completion: true
remote_cluster: local
leader_index: foo
- is_true: follow_index_created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
ccr.follow:
index: bar
body:
wait_for_completion: true
remote_cluster: local
leader_index: foo
- is_true: follow_index_created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
ccr.follow:
index: bar
body:
wait_for_completion: true
remote_cluster: local
leader_index: foo
- is_true: follow_index_created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected static void followIndex(String leaderCluster, String leaderIndex, Stri
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
"\", \"read_poll_timeout\": \"10ms\"}");
"\", \"read_poll_timeout\": \"10ms\", \"wait_for_completion\": true}");
assertOK(client.performRequest(request));
}

Expand Down Expand Up @@ -186,6 +186,7 @@ protected static Map<String, Object> toMap(String response) {
protected static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_active_shards", "1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this change necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg identified that it was necessary as cluster health will return before the shards are active. It fixed a test that was failing.

request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ private void followLeaderIndex(String autoFollowPattenName,
request.setRemoteCluster(remoteCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowRequest(followRequest);
request.setWaitForCompletion(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why set this to true?
This will mean that recordLeaderIndexAsFollowFunction will possibly be only called after a very long time, i.e., after the recovery from remote is completed /cc: @martijnvg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that the auto follow functionality would expect the shards to be setup and active. But maybe that is not true? @martijnvg can comment here.


// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down
Loading