diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 13dc84b858243..cca60579ae6f9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -596,6 +596,9 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) { + if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) { + continue; + } if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 75606699b1159..a80b2ae71349a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.ccr.action; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -18,11 +20,13 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -44,10 +48,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -57,6 +63,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -416,6 +423,26 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { assertThat(result.get(1).getName(), equalTo("index2")); } + public void testGetLeaderIndicesToFollowWithClosedIndices() { + final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), + null, null, null, null, null, null, null, null, null, null, null); + + // index is opened + ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(1)); + assertThat(result, hasItem(remoteState.metaData().index("test-index").getIndex())); + + // index is closed + remoteState = ClusterState.builder(remoteState) + .metaData(MetaData.builder(remoteState.metaData()) + .put(IndexMetaData.builder(remoteState.metaData().index("test-index")).state(IndexMetaData.State.CLOSE).build(), true) + .build()) + .build(); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(0)); + } + public void testRecordLeaderIndexAsFollowFunction() { AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap()); @@ -763,7 +790,9 @@ void updateAutoFollowMetadata(Function updateFunctio autoFollower.start(); assertThat(allResults.size(), equalTo(states.length)); for (int i = 0; i < states.length; i++) { - assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true)); + final String indexName = "logs-" + i; + assertThat(allResults.get(i).autoFollowExecutionResults.keySet().stream() + .anyMatch(index -> index.getName().equals(indexName)), is(true)); } } @@ -1049,6 +1078,87 @@ void updateAutoFollowMetadata( } } + public void testClosedIndicesAreNotAutoFollowed() { + final Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + final String pattern = "pattern1"; + final ClusterState localState = ClusterState.builder(new ClusterName("local")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.singletonMap(pattern, + new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null, + null, null, null)), + Collections.singletonMap(pattern, Collections.emptyList()), + Collections.singletonMap(pattern, Collections.emptyMap())))) + .build(); + + ClusterState remoteState = null; + final int nbLeaderIndices = randomInt(15); + for (int i = 0; i < nbLeaderIndices; i++) { + String indexName = "docs-" + i; + if (remoteState == null) { + remoteState = createRemoteClusterState(indexName, true); + } else { + remoteState = createRemoteClusterState(remoteState, indexName); + } + if (randomBoolean()) { + // randomly close the index + remoteState = ClusterState.builder(remoteState.getClusterName()) + .routingTable(remoteState.routingTable()) + .metaData(MetaData.builder(remoteState.metaData()) + .put(IndexMetaData.builder(remoteState.metaData().index(indexName)).state(IndexMetaData.State.CLOSE).build(), true) + .build()) + .build(); + } + } + + final ClusterState finalRemoteState = remoteState; + final AtomicReference lastModifiedClusterState = new AtomicReference<>(localState); + final List results = new ArrayList<>(); + final Set followedIndices = ConcurrentCollections.newConcurrentSet(); + final AutoFollower autoFollower = + new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + followedIndices.add(followRequest.getLeaderIndex()); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + lastModifiedClusterState.updateAndGet(updateFunction::apply); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + + assertThat(results, notNullValue()); + assertThat(results.size(), equalTo(1)); + + for (ObjectObjectCursor index : remoteState.metaData().indices()) { + boolean expect = index.value.getState() == IndexMetaData.State.OPEN; + assertThat(results.get(0).autoFollowExecutionResults.containsKey(index.value.getIndex()), is(expect)); + assertThat(followedIndices.contains(index.key), is(expect)); + } + } + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { Settings.Builder indexSettings; if (enableSoftDeletes != null) { @@ -1075,11 +1185,13 @@ private static ClusterState createRemoteClusterState(String indexName, Boolean e private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { IndexMetaData indexMetaData = IndexMetaData.builder(indexName) - .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) .numberOfShards(1) .numberOfReplicas(0) .build(); - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName()) .metaData(MetaData.builder(previous.metaData()) .version(previous.metaData().version() + 1) .put(indexMetaData, true)); @@ -1087,7 +1199,7 @@ private static ClusterState createRemoteClusterState(ClusterState previous, Stri ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); - csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build(); return csBuilder.build(); }