Skip to content

Commit

Permalink
Set timeout of master requests on follower to unbounded (#60070)
Browse files Browse the repository at this point in the history
Today, a follow task will fail if the master node of the follower 
cluster is temporarily overloaded and unable to process master node
requests (such as update mapping, setting, or alias) from a follow-task
within the default timeout. This error is transient, and follow-tasks
should not abort. We can avoid this problem by setting the timeout of
master node requests on the follower cluster to unbounded.

Closes #56891
  • Loading branch information
dnhatn authored Jul 27, 2020
1 parent 6398f73 commit f238d0a
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TimeValue implements Comparable<TimeValue> {

public static final TimeValue MINUS_ONE = timeValueMillis(-1);
public static final TimeValue ZERO = timeValueMillis(0);
public static final TimeValue MAX_VALUE = TimeValue.timeValueNanos(Long.MAX_VALUE);

private static final long C0 = 1L;
private static final long C1 = C0 * 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws
fail("need repository");
}

ClusterUpdateSettingsRequest putSecondCluster = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest putSecondCluster = newSettingsRequest();
String address = getFollowerCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
putSecondCluster.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(putSecondCluster).actionGet());
Expand All @@ -83,19 +83,19 @@ public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws
fail("need repository");
}

ClusterUpdateSettingsRequest deleteLeaderCluster = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest deleteLeaderCluster = newSettingsRequest();
deleteLeaderCluster.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", ""));
assertAcked(followerClient().admin().cluster().updateSettings(deleteLeaderCluster).actionGet());

expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(leaderClusterRepoName));

ClusterUpdateSettingsRequest deleteSecondCluster = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest deleteSecondCluster = newSettingsRequest();
deleteSecondCluster.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", ""));
assertAcked(followerClient().admin().cluster().updateSettings(deleteSecondCluster).actionGet());

expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(followerCopyRepoName));

ClusterUpdateSettingsRequest putLeaderRequest = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest putLeaderRequest = newSettingsRequest();
address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
putLeaderRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(putLeaderRequest).actionGet());
Expand All @@ -119,7 +119,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
.renameReplacement(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)
.indexSettings(settingsBuilder);

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I
}

public void testDocsAreRecovered() throws Exception {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest();
String chunkSize = randomFrom("4KB", "128KB", "1MB");
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testDocsAreRecovered() throws Exception {
assertExpectedDocument(followerIndex, i);
}

settingsRequest = new ClusterUpdateSettingsRequest();
settingsRequest = newSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
Expand All @@ -213,7 +213,7 @@ public void testDocsAreRecovered() throws Exception {
public void testRateLimitingIsEmployed() throws Exception {
boolean followerRateLimiting = randomBoolean();

ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest();
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
if (followerRateLimiting) {
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
Expand Down Expand Up @@ -257,7 +257,7 @@ public void testRateLimitingIsEmployed() throws Exception {
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
.renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
.renameReplacement(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)
.indexSettings(settingsBuilder);

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
Expand All @@ -270,7 +270,7 @@ public void testRateLimitingIsEmployed() throws Exception {
assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0));
}

settingsRequest = new ClusterUpdateSettingsRequest();
settingsRequest = newSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
if (followerRateLimiting) {
Expand All @@ -281,7 +281,7 @@ public void testRateLimitingIsEmployed() throws Exception {
}

public void testIndividualActionsTimeout() throws Exception {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest();
TimeValue timeValue = TimeValue.timeValueMillis(100);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), timeValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
Expand Down Expand Up @@ -359,6 +359,10 @@ public void testIndividualActionsTimeout() throws Exception {
}
}

private ClusterUpdateSettingsRequest newSettingsRequest() {
return new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest(
final int numberOfReplicas,
final String followerIndex,
final int numberOfDocuments) throws IOException {
final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
final String chunkSize = new ByteSizeValue(randomFrom(4, 128, 1024), ByteSizeUnit.KB).getStringRep();
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
Expand All @@ -129,7 +129,8 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest(
final Map<String, String> additionalSettings = new HashMap<>();
additionalSettings.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
.setMasterNodeTimeout(TimeValue.MAX_VALUE).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

logger.info("indexing [{}] docs", numberOfDocuments);
Expand All @@ -152,7 +153,7 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest(
.indicesOptions(indicesOptions)
.renamePattern("^(.*)$")
.renameReplacement(followerIndex)
.masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS));
.masterNodeTimeout(TimeValue.MAX_VALUE);
}

public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception {
Expand Down Expand Up @@ -470,7 +471,7 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
ensureFollowerGreen(true, followerIndex);

pauseFollow(followerIndex);
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();

// we will disrupt requests to remove retention leases for these random shards
final Set<Integer> shardIds =
Expand Down Expand Up @@ -944,7 +945,7 @@ public void testForgetFollower() throws Exception {
ensureFollowerGreen(true, followerIndex);

pauseFollow(followerIndex);
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();

final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetadata(true).setIndices(followerIndex).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testCloseAndReopenFollowerIndex() throws Exception {

atLeastDocsIndexed(followerClient(), "index2", 32);

CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2");
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE);
closeIndexRequest.waitForActiveShards(ActiveShardCount.NONE);
AcknowledgedResponse response = followerClient().admin().indices().close(closeIndexRequest).get();
assertThat(response.isAcknowledged(), is(true));
Expand All @@ -111,7 +111,7 @@ public void testCloseAndReopenFollowerIndex() throws Exception {
thread.join();
}

assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get());
assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).get());

clusterState = followerClient().admin().cluster().prepareState().get().getState();
assertThat(clusterState.metadata().index("index2").getState(), is(IndexMetadata.State.OPEN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testAddNewReplicasOnFollower() throws Exception {
});
flushingOnFollower.start();
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 50);
followerClient().admin().indices().prepareUpdateSettings("follower-index")
followerClient().admin().indices().prepareUpdateSettings("follower-index").setMasterNodeTimeout(TimeValue.MAX_VALUE)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get();
ensureFollowerGreen("follower-index");
awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 100);
Expand Down
Loading

0 comments on commit f238d0a

Please sign in to comment.