From f238d0a3678452b3fc944739effc034e46528eb9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 27 Jul 2020 09:23:30 -0400 Subject: [PATCH] Set timeout of master requests on follower to unbounded (#60070) Today, a follow task will fail if the master node of the follower cluster is temporarily overloaded and unable to process master node requests (such as update mapping, setting, or alias) from a follow-task within the default timeout. This error is transient, and follow-tasks should not abort. We can avoid this problem by setting the timeout of master node requests on the follower cluster to unbounded. Closes #56891 --- .../elasticsearch/common/unit/TimeValue.java | 1 + .../xpack/ccr/CcrRepositoryIT.java | 26 +++++----- .../xpack/ccr/CcrRetentionLeaseIT.java | 11 +++-- .../xpack/ccr/CloseFollowerIndexIT.java | 4 +- .../xpack/ccr/FollowerFailOverIT.java | 2 +- .../xpack/ccr/IndexFollowingIT.java | 45 +++++++++-------- .../ccr/PrimaryFollowerAllocationIT.java | 1 + .../xpack/ccr/action/CcrRequests.java | 1 + .../ccr/action/ShardFollowTasksExecutor.java | 14 +++--- .../xpack/ccr/repository/CcrRepository.java | 3 +- .../elasticsearch/xpack/CcrIntegTestCase.java | 48 ++++++++++++++++++- 11 files changed, 108 insertions(+), 48 deletions(-) diff --git a/libs/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java b/libs/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java index b2f9128673eb4..bbca172dc421d 100644 --- a/libs/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java +++ b/libs/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java @@ -30,6 +30,7 @@ public class TimeValue implements Comparable { public static final TimeValue MINUS_ONE = timeValueMillis(-1); public static final TimeValue ZERO = timeValueMillis(0); + public static final TimeValue MAX_VALUE = TimeValue.timeValueNanos(Long.MAX_VALUE); private static final long C0 = 1L; private static final long C1 = C0 * 1000L; diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 37a84282cfd57..91fd884f6becd 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -69,7 +69,7 @@ public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws fail("need repository"); } - ClusterUpdateSettingsRequest putSecondCluster = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest putSecondCluster = newSettingsRequest(); String address = getFollowerCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); putSecondCluster.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", address)); assertAcked(followerClient().admin().cluster().updateSettings(putSecondCluster).actionGet()); @@ -83,19 +83,19 @@ public void testThatRepositoryIsPutAndRemovedWhenRemoteClusterIsUpdated() throws fail("need repository"); } - ClusterUpdateSettingsRequest deleteLeaderCluster = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest deleteLeaderCluster = newSettingsRequest(); deleteLeaderCluster.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", "")); assertAcked(followerClient().admin().cluster().updateSettings(deleteLeaderCluster).actionGet()); expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(leaderClusterRepoName)); - ClusterUpdateSettingsRequest deleteSecondCluster = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest deleteSecondCluster = newSettingsRequest(); deleteSecondCluster.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", "")); assertAcked(followerClient().admin().cluster().updateSettings(deleteSecondCluster).actionGet()); expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(followerCopyRepoName)); - ClusterUpdateSettingsRequest putLeaderRequest = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest putLeaderRequest = newSettingsRequest(); address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); putLeaderRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address)); assertAcked(followerClient().admin().cluster().updateSettings(putLeaderRequest).actionGet()); @@ -119,7 +119,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$") - .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) + .renameReplacement(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE) .indexSettings(settingsBuilder); PlainActionFuture future = PlainActionFuture.newFuture(); @@ -160,7 +160,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I } public void testDocsAreRecovered() throws Exception { - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest(); String chunkSize = randomFrom("4KB", "128KB", "1MB"); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize)); assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); @@ -204,7 +204,7 @@ public void testDocsAreRecovered() throws Exception { assertExpectedDocument(followerIndex, i); } - settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest = newSettingsRequest(); ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue)); assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); @@ -213,7 +213,7 @@ public void testDocsAreRecovered() throws Exception { public void testRateLimitingIsEmployed() throws Exception { boolean followerRateLimiting = randomBoolean(); - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest(); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K")); if (followerRateLimiting) { assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); @@ -257,7 +257,7 @@ public void testRateLimitingIsEmployed() throws Exception { .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$") - .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) + .renameReplacement(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE) .indexSettings(settingsBuilder); PlainActionFuture future = PlainActionFuture.newFuture(); @@ -270,7 +270,7 @@ public void testRateLimitingIsEmployed() throws Exception { assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0)); } - settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest = newSettingsRequest(); ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue)); if (followerRateLimiting) { @@ -281,7 +281,7 @@ public void testRateLimitingIsEmployed() throws Exception { } public void testIndividualActionsTimeout() throws Exception { - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest(); TimeValue timeValue = TimeValue.timeValueMillis(100); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), timeValue)); assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); @@ -359,6 +359,10 @@ public void testIndividualActionsTimeout() throws Exception { } } + private ClusterUpdateSettingsRequest newSettingsRequest() { + return new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE); + } + public void testFollowerMappingIsUpdated() throws IOException { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 5708990657181..38e9cbc45c0f8 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -119,7 +119,7 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest( final int numberOfReplicas, final String followerIndex, final int numberOfDocuments) throws IOException { - final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE); final String chunkSize = new ByteSizeValue(randomFrom(4, 128, 1024), ByteSizeUnit.KB).getStringRep(); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize)); assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); @@ -129,7 +129,8 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest( final Map additionalSettings = new HashMap<>(); additionalSettings.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalSettings); - assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex) + .setMasterNodeTimeout(TimeValue.MAX_VALUE).setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderGreen(leaderIndex); logger.info("indexing [{}] docs", numberOfDocuments); @@ -152,7 +153,7 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest( .indicesOptions(indicesOptions) .renamePattern("^(.*)$") .renameReplacement(followerIndex) - .masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)); + .masterNodeTimeout(TimeValue.MAX_VALUE); } public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception { @@ -470,7 +471,7 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { ensureFollowerGreen(true, followerIndex); pauseFollow(followerIndex); - followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); // we will disrupt requests to remove retention leases for these random shards final Set shardIds = @@ -944,7 +945,7 @@ public void testForgetFollower() throws Exception { ensureFollowerGreen(true, followerIndex); pauseFollow(followerIndex); - followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetadata(true).setIndices(followerIndex).get(); diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java index de9d2a542cc90..1f4599c326a63 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -96,7 +96,7 @@ public void testCloseAndReopenFollowerIndex() throws Exception { atLeastDocsIndexed(followerClient(), "index2", 32); - CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2"); + CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE); closeIndexRequest.waitForActiveShards(ActiveShardCount.NONE); AcknowledgedResponse response = followerClient().admin().indices().close(closeIndexRequest).get(); assertThat(response.isAcknowledged(), is(true)); @@ -111,7 +111,7 @@ public void testCloseAndReopenFollowerIndex() throws Exception { thread.join(); } - assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get()); + assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).get()); clusterState = followerClient().admin().cluster().prepareState().get().getState(); assertThat(clusterState.metadata().index("index2").getState(), is(IndexMetadata.State.OPEN)); diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index 570281a43ac76..82919a32df4ca 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -223,7 +223,7 @@ public void testAddNewReplicasOnFollower() throws Exception { }); flushingOnFollower.start(); awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 50); - followerClient().admin().indices().prepareUpdateSettings("follower-index") + followerClient().admin().indices().prepareUpdateSettings("follower-index").setMasterNodeTimeout(TimeValue.MAX_VALUE) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get(); ensureFollowerGreen("follower-index"); awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 100); diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index abb9942cc9498..05056166c23ab 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -138,7 +138,7 @@ public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); int numberOfReplicas = between(0, 1); - followerClient().admin().cluster().prepareUpdateSettings() + followerClient().admin().cluster().prepareUpdateSettings().setMasterNodeTimeout(TimeValue.MAX_VALUE) .setTransientSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.KB))) .get(); @@ -410,6 +410,7 @@ public void testNoMappingDefined() throws Exception { } public void testDoNotAllowPutMappingToFollower() throws Exception { + removeMasterNodeRequestsValidatorOnFollowerCluster(); final String leaderIndexSettings = getIndexSettings(between(1, 2), between(0, 1)); assertAcked(leaderClient().admin().indices().prepareCreate("index-1").setSource(leaderIndexSettings, XContentType.JSON)); followerClient().execute(PutFollowAction.INSTANCE, putFollow("index-1", "index-2")).get(); @@ -432,7 +433,7 @@ public void testDoNotAllowAddAliasToFollower() throws Exception { getIndexSettings(between(1, 2), between(0, 1)); assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader", "follower")).get(); - final IndicesAliasesRequest request = new IndicesAliasesRequest() + final IndicesAliasesRequest request = new IndicesAliasesRequest().masterNodeTimeout(TimeValue.MAX_VALUE) .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("follower").alias("follower_alias")); final ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> followerClient().admin().indices().aliases(request).actionGet()); @@ -449,10 +450,10 @@ public void testAddAliasAfterUnfollow() throws Exception { assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader", "follower")).get(); pauseFollow("follower"); - followerClient().admin().indices().close(new CloseIndexRequest("follower")).actionGet(); + followerClient().admin().indices().close(new CloseIndexRequest("follower").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("follower")).actionGet()); - followerClient().admin().indices().open(new OpenIndexRequest("follower")).actionGet(); - final IndicesAliasesRequest request = new IndicesAliasesRequest() + followerClient().admin().indices().open(new OpenIndexRequest("follower").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); + final IndicesAliasesRequest request = new IndicesAliasesRequest().masterNodeTimeout(TimeValue.MAX_VALUE) .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("follower").alias("follower_alias")); assertAcked(followerClient().admin().indices().aliases(request).actionGet()); final GetAliasesResponse response = @@ -576,7 +577,10 @@ public void testUnfollowNonExistingIndex() { public void testFollowNonExistentIndex() throws Exception { String indexSettings = getIndexSettings(1, 0); assertAcked(leaderClient().admin().indices().prepareCreate("test-leader").setSource(indexSettings, XContentType.JSON).get()); - assertAcked(followerClient().admin().indices().prepareCreate("test-follower").setSource(indexSettings, XContentType.JSON).get()); + assertAcked(followerClient().admin().indices().prepareCreate("test-follower") + .setSource(indexSettings, XContentType.JSON) + .setMasterNodeTimeout(TimeValue.MAX_VALUE) + .get()); ensureLeaderGreen("test-leader"); ensureFollowerGreen("test-follower"); // Leader index does not exist. @@ -635,9 +639,9 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); pauseFollow("index2"); - followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("index2"); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE); updateSettingsRequest.settings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build()); Exception e = expectThrows(IllegalArgumentException.class, () -> followerClient().admin().indices().updateSettings(updateSettingsRequest).actionGet()); @@ -691,7 +695,7 @@ public void testCloseFollowIndex() throws Exception { leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L))); - followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get(); assertBusy(() -> { StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet(); @@ -700,7 +704,7 @@ public void testCloseFollowIndex() throws Exception { assertThat(response.getStatsResponses(), hasSize(1)); assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L)); }); - followerClient().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); + followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(2L))); pauseFollow("index2"); @@ -765,7 +769,7 @@ public void testResumeFollowOnClosedIndex() throws Exception { assertTrue(response.isIndexFollowingStarted()); pauseFollow(followerIndex); - assertAcked(leaderClient().admin().indices().prepareClose(leaderIndex)); + assertAcked(leaderClient().admin().indices().prepareClose(leaderIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE)); expectThrows(IndexClosedException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet()); @@ -784,7 +788,7 @@ public void testDeleteFollowerIndex() throws Exception { leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get(); assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L))); - followerClient().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet(); + followerClient().admin().indices().delete(new DeleteIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get(); assertBusy(() -> { StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet(); @@ -808,7 +812,7 @@ public void testPauseIndex() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build())); followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader", "follower")).get(); - assertAcked(followerClient().admin().indices().prepareCreate("regular-index")); + assertAcked(followerClient().admin().indices().prepareCreate("regular-index").setMasterNodeTimeout(TimeValue.MAX_VALUE)); assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower")).actionGet()); assertThat(expectThrows(IllegalArgumentException.class, () -> followerClient().execute( PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower")).actionGet()).getMessage(), @@ -836,9 +840,9 @@ public void testUnfollowIndex() throws Exception { // Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index: pauseFollow("index2"); - followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet()); - followerClient().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); + followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet(); ensureFollowerGreen("index2"); // Indexing succeeds now, because index2 is no longer a follow index: @@ -1144,6 +1148,7 @@ public void onFailure(String source, Exception e) { } public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception { + removeMasterNodeRequestsValidatorOnFollowerCluster(); final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1)); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); @@ -1290,7 +1295,7 @@ private void runFallBehindTest( exceptionConsumer.accept(exceptions); }); - followerClient().admin().indices().prepareClose("index2").get(); + followerClient().admin().indices().prepareClose("index2").setMasterNodeTimeout(TimeValue.MAX_VALUE).get(); pauseFollow("index2"); if (randomBoolean()) { assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet()); @@ -1326,11 +1331,11 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception { assertTrue(response.isFollowIndexShardsAcked()); assertTrue(response.isIndexFollowingStarted()); - logger.info("Indexing [{}] docs while updateing remote config", firstBatchNumDocs); + logger.info("Indexing [{}] docs while updating remote config", firstBatchNumDocs); try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, randomIntBetween(1, 5))) { - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE); String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); Setting> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster"); @@ -1361,7 +1366,7 @@ public void testUpdateRemoteConfigsDuringFollowing() throws Exception { assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } finally { - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE); String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); Setting compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster"); Setting> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster"); @@ -1391,7 +1396,7 @@ public void testCleanUpShardFollowTasksForDeletedIndices() throws Exception { assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards)); }); - assertAcked(followerClient().admin().indices().prepareDelete("index2")); + assertAcked(followerClient().admin().indices().prepareDelete("index2").setMasterNodeTimeout(TimeValue.MAX_VALUE)); assertBusy(() -> { String action = ShardFollowTask.NAME + "[c]"; diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java index b773b43f54feb..5cbf9e786ec42 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java @@ -102,6 +102,7 @@ public void testAllocateFollowerPrimaryToNodesWithRemoteClusterClientRole() thro }); // Follower primaries can be relocated to nodes without the remote cluster client role followerClient().admin().indices().prepareUpdateSettings(followerIndex) + .setMasterNodeTimeout(TimeValue.MAX_VALUE) .setSettings(Settings.builder().put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes))) .get(); assertBusy(() -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java index 0bf9bebbd0169..3cbce7033d61e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -43,6 +43,7 @@ public static PutMappingRequest putMappingRequest(String followerIndex, MappingM PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex); putMappingRequest.origin("ccr"); putMappingRequest.source(mappingMetadata.source().string(), XContentType.JSON); + putMappingRequest.masterNodeTimeout(TimeValue.MAX_VALUE); return putMappingRequest; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 3cd41d602598d..f53e318c7da2c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -205,8 +205,9 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum // if so just update the follower index's settings: if (updatedSettings.keySet().stream().allMatch(indexScopedSettings::isDynamicSetting)) { // If only dynamic settings have been updated then just update these settings in follower index: - final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName()); - updateSettingsRequest.settings(updatedSettings); + final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName()) + .masterNodeTimeout(TimeValue.MAX_VALUE) + .settings(updatedSettings); followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), errorHandler)); } else { @@ -327,7 +328,7 @@ protected void innerUpdateAliases(final LongConsumer handler, final Consumer onFailure) { - CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex); + CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex).masterNodeTimeout(TimeValue.MAX_VALUE); CheckedConsumer onResponse = response -> { updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure); }; @@ -358,7 +359,8 @@ private void updateSettingsAndOpenIndex(String followIndex, Settings updatedSettings, Runnable handler, Consumer onFailure) { - final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex); + final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex) + .masterNodeTimeout(TimeValue.MAX_VALUE); updateSettingsRequest.settings(updatedSettings); CheckedConsumer onResponse = response -> openIndex(followIndex, handler, onFailure); followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure)); @@ -367,7 +369,7 @@ private void updateSettingsAndOpenIndex(String followIndex, private void openIndex(String followIndex, Runnable handler, Consumer onFailure) { - OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex); + OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex).masterNodeTimeout(TimeValue.MAX_VALUE); CheckedConsumer onResponse = response -> handler.run(); followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 611d11f1e2e08..6bd8e02da356e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -454,8 +454,7 @@ private void updateMappings(Client leaderClient, Index leaderIndex, long leaderM final IndexMetadata leaderIndexMetadata = indexMetadataFuture.actionGet(ccrSettings.getRecoveryActionTimeout()); final MappingMetadata mappingMetadata = leaderIndexMetadata.mapping(); if (mappingMetadata != null) { - final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetadata) - .masterNodeTimeout(TimeValue.timeValueMinutes(30)); + final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetadata); followerClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 502c36d22c10a..e9dead150ec42 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; @@ -77,6 +78,7 @@ import org.elasticsearch.test.TestCluster; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.RemoteConnectionStrategy; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.nio.MockNioTransportPlugin; import org.elasticsearch.xpack.ccr.CcrSettings; @@ -84,11 +86,16 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -146,6 +153,7 @@ public final void startClusters() throws Exception { if (clusterGroup != null && reuseClusters()) { clusterGroup.leaderCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster()); clusterGroup.followerCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster()); + setupMasterNodeRequestsValidatorOnFollowerCluster(); return; } @@ -176,6 +184,43 @@ public final void startClusters() throws Exception { ClusterService clusterService = followerCluster.getInstance(ClusterService.class); assertNotNull(clusterService.state().metadata().custom(LicensesMetadata.TYPE)); }); + setupMasterNodeRequestsValidatorOnFollowerCluster(); + } + + protected void setupMasterNodeRequestsValidatorOnFollowerCluster() { + final InternalTestCluster followerCluster = clusterGroup.followerCluster; + for (String nodeName : followerCluster.getNodeNames()) { + MockTransportService transportService = (MockTransportService) followerCluster.getInstance(TransportService.class, nodeName); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (isCcrAdminRequest(request) == false && request instanceof AcknowledgedRequest) { + final TimeValue masterTimeout = ((AcknowledgedRequest) request).masterNodeTimeout(); + if (masterTimeout == null || masterTimeout.nanos() != TimeValue.MAX_VALUE.nanos()) { + throw new AssertionError("time out of a master request [" + request + "] on the follower is not set to unbounded"); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + } + + protected void removeMasterNodeRequestsValidatorOnFollowerCluster() { + final InternalTestCluster followerCluster = clusterGroup.followerCluster; + for (String nodeName : followerCluster.getNodeNames()) { + MockTransportService transportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, nodeName); + transportService.clearAllRules(); + } + } + + private static boolean isCcrAdminRequest(TransportRequest request) { + return request instanceof PutFollowAction.Request || + request instanceof ResumeFollowAction.Request || + request instanceof PauseFollowAction.Request || + request instanceof UnfollowAction.Request || + request instanceof ForgetFollowerAction.Request || + request instanceof PutAutoFollowPatternAction.Request || + request instanceof ActivateAutoFollowPatternAction.Request || + request instanceof DeleteAutoFollowPatternAction.Request; } /** @@ -183,7 +228,7 @@ public final void startClusters() throws Exception { * is not replicated and if tests kill nodes, we have to wait 60s by default... */ protected void disableDelayedAllocation(String index) { - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index).masterNodeTimeout(TimeValue.MAX_VALUE); Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0); updateSettingsRequest.settings(settingsBuilder); @@ -193,6 +238,7 @@ protected void disableDelayedAllocation(String index) { @After public void afterTest() throws Exception { ensureEmptyWriteBuffers(); + removeMasterNodeRequestsValidatorOnFollowerCluster(); String masterNode = clusterGroup.followerCluster.getMasterName(); ClusterService clusterService = clusterGroup.followerCluster.getInstance(ClusterService.class, masterNode); removeCCRRelatedMetadataFromClusterState(clusterService);