diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java new file mode 100644 index 0000000000000..5b60207859a28 --- /dev/null +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationDecision; +import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.NodeRoles; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; + +public class PrimaryFollowerAllocationIT extends CcrIntegTestCase { + + @Override + protected boolean reuseClusters() { + return false; + } + + public void testDoNotAllocateFollowerPrimaryToNodesWithoutRemoteClusterClientRole() throws Exception { + final String leaderIndex = "leader-not-allow-index"; + final String followerIndex = "follower-not-allow-index"; + final List dataOnlyNodes = getFollowerCluster().startNodes(between(1, 2), + NodeRoles.onlyRoles(Collections.singleton(DiscoveryNodeRole.DATA_ROLE))); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex) + .setSource(getIndexSettings(between(1, 2), between(0, 1)), XContentType.JSON)); + final PutFollowAction.Request putFollowRequest = putFollow(leaderIndex, followerIndex); + putFollowRequest.setSettings(Settings.builder() + .put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes)) + .build()); + putFollowRequest.waitForActiveShards(ActiveShardCount.ONE); + putFollowRequest.timeout(TimeValue.timeValueSeconds(2)); + final PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, putFollowRequest).get(); + assertFalse(response.isFollowIndexShardsAcked()); + assertFalse(response.isIndexFollowingStarted()); + final ClusterAllocationExplanation explanation = followerClient().admin().cluster().prepareAllocationExplain() + .setIndex(followerIndex).setShard(0).setPrimary(true).get().getExplanation(); + for (NodeAllocationResult nodeDecision : explanation.getShardAllocationDecision().getAllocateDecision().getNodeDecisions()) { + assertThat(nodeDecision.getNodeDecision(), equalTo(AllocationDecision.NO)); + if (dataOnlyNodes.contains(nodeDecision.getNode().getName())) { + final List decisions = nodeDecision.getCanAllocateDecision().getDecisions() + .stream().map(Object::toString).collect(Collectors.toList()); + assertThat("NO(shard is a primary follower and being bootstrapped, but node does not have the remote_cluster_client role)", + in(decisions)); + } + } + } + + public void testAllocateFollowerPrimaryToNodesWithRemoteClusterClientRole() throws Exception { + final String leaderIndex = "leader-allow-index"; + final String followerIndex = "follower-allow-index"; + final List dataOnlyNodes = getFollowerCluster().startNodes(between(2, 3), + NodeRoles.onlyRoles(Collections.singleton(DiscoveryNodeRole.DATA_ROLE))); + final List dataAndRemoteNodes = getFollowerCluster().startNodes(between(1, 2), + NodeRoles.onlyRoles(Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex) + .setSource(getIndexSettings(between(1, 2), between(0, 1)), XContentType.JSON)); + final PutFollowAction.Request putFollowRequest = putFollow(leaderIndex, followerIndex); + putFollowRequest.setSettings(Settings.builder() + .put("index.routing.rebalance.enable", "none") + .put("index.routing.allocation.include._name", + Stream.concat(dataOnlyNodes.stream(), dataAndRemoteNodes.stream()).collect(Collectors.joining(","))) + .build()); + final PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, putFollowRequest).get(); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + ensureFollowerGreen(followerIndex); + int numDocs = between(0, 20); + for (int i = 0; i < numDocs; i++) { + leaderClient().prepareIndex(leaderIndex, "_doc").setSource("f", i).get(); + } + // Empty follower primaries must be assigned to nodes with the remote cluster client role + assertBusy(() -> { + final ClusterState state = getFollowerCluster().client().admin().cluster().prepareState().get().getState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(followerIndex)) { + final ShardRouting primaryShard = shardRoutingTable.primaryShard(); + assertTrue(primaryShard.assignedToNode()); + final DiscoveryNode assignedNode = state.nodes().get(primaryShard.currentNodeId()); + assertThat(assignedNode.getName(), in(dataAndRemoteNodes)); + } + }); + // Follower primaries can be relocated to nodes without the remote cluster client role + followerClient().admin().indices().prepareUpdateSettings(followerIndex) + .setSettings(Settings.builder().put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes))) + .get(); + assertBusy(() -> { + final ClusterState state = getFollowerCluster().client().admin().cluster().prepareState().get().getState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(followerIndex)) { + for (ShardRouting shard : shardRoutingTable) { + assertNotNull(shard.currentNodeId()); + final DiscoveryNode assignedNode = state.nodes().get(shard.currentNodeId()); + assertThat(assignedNode.getName(), in(dataOnlyNodes)); + } + } + }); + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + // Follower primaries can be recovered from the existing copies on nodes without the remote cluster client role + getFollowerCluster().fullRestart(); + ensureFollowerGreen(followerIndex); + assertBusy(() -> { + final ClusterState state = getFollowerCluster().client().admin().cluster().prepareState().get().getState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(followerIndex)) { + for (ShardRouting shard : shardRoutingTable) { + assertNotNull(shard.currentNodeId()); + final DiscoveryNode assignedNode = state.nodes().get(shard.currentNodeId()); + assertThat(assignedNode.getName(), in(dataOnlyNodes)); + } + } + }); + int moreDocs = between(0, 20); + for (int i = 0; i < moreDocs; i++) { + leaderClient().prepareIndex(leaderIndex, "_doc").setSource("f", i).get(); + } + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 7224ebf319736..7a2309c479780 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Module; @@ -36,6 +37,7 @@ import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; @@ -75,6 +77,7 @@ import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.allocation.CcrPrimaryFollowerAllocationDecider; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; @@ -124,7 +127,7 @@ /** * Container class for CCR functionality. */ -public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin { +public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin, ClusterPlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; @@ -375,4 +378,9 @@ public Collection> mapping public Collection> indicesAliasesRequestValidators() { return Collections.singletonList(CcrRequests.CCR_INDICES_ALIASES_REQUEST_VALIDATOR); } + + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return List.of(new CcrPrimaryFollowerAllocationDecider()); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/allocation/CcrPrimaryFollowerAllocationDecider.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/allocation/CcrPrimaryFollowerAllocationDecider.java new file mode 100644 index 0000000000000..3aa2121245990 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/allocation/CcrPrimaryFollowerAllocationDecider.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.xpack.ccr.allocation; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.xpack.ccr.CcrSettings; + +/** + * An allocation decider that ensures primary shards of follower indices that are being bootstrapped are assigned to nodes that have the + * remote cluster client role. This is necessary as those nodes reach out to the leader shards on the remote cluster to copy Lucene segment + * files and periodically renew retention leases during the bootstrap. + */ +public final class CcrPrimaryFollowerAllocationDecider extends AllocationDecider { + static final String NAME = "ccr_primary_follower"; + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + final IndexMetadata indexMetadata = allocation.metadata().index(shardRouting.index()); + if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetadata.getSettings()) == false) { + return allocation.decision(Decision.YES, NAME, "shard is not a follower and is not under the purview of this decider"); + } + if (shardRouting.primary() == false) { + return allocation.decision(Decision.YES, NAME, "shard is a replica follower and is not under the purview of this decider"); + } + final RecoverySource recoverySource = shardRouting.recoverySource(); + if (recoverySource == null || recoverySource.getType() != RecoverySource.Type.SNAPSHOT) { + return allocation.decision(Decision.YES, NAME, + "shard is a primary follower but was bootstrapped already; hence is not under the purview of this decider"); + } + if (node.node().isRemoteClusterClient() == false) { + return allocation.decision(Decision.NO, NAME, "shard is a primary follower and being bootstrapped, but node does not have the " + + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"); + } + return allocation.decision(Decision.YES, NAME, + "shard is a primary follower and node has the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role"); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/allocation/CcrPrimaryFollowerAllocationDeciderTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/allocation/CcrPrimaryFollowerAllocationDeciderTests.java new file mode 100644 index 0000000000000..92cf1f4516885 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/allocation/CcrPrimaryFollowerAllocationDeciderTests.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.xpack.ccr.allocation; + +import com.carrotsearch.hppc.IntHashSet; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.hamcrest.Matchers.equalTo; + +public class CcrPrimaryFollowerAllocationDeciderTests extends ESAllocationTestCase { + + public void testRegularIndex() { + String index = "test-index"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index).settings(settings(Version.CURRENT)) + .numberOfShards(1).numberOfReplicas(1); + List nodes = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + final Set roles = new HashSet<>(); + roles.add(DiscoveryNodeRole.DATA_ROLE); + if (randomBoolean()) { + roles.add(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE); + } + nodes.add(newNode("node" + i, roles)); + } + DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder(); + nodes.forEach(discoveryNodes::add); + Metadata metadata = Metadata.builder().put(indexMetadata).build(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + if (randomBoolean()) { + routingTable.addAsNew(metadata.index(index)); + } else if (randomBoolean()) { + routingTable.addAsRecovery(metadata.index(index)); + } else if (randomBoolean()) { + routingTable.addAsNewRestore(metadata.index(index), newSnapshotRecoverySource(), new IntHashSet()); + } else { + routingTable.addAsRestore(metadata.index(index), newSnapshotRecoverySource()); + } + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.EMPTY_NODES).metadata(metadata).routingTable(routingTable.build()).build(); + for (int i = 0; i < clusterState.routingTable().index(index).shards().size(); i++) { + IndexShardRoutingTable shardRouting = clusterState.routingTable().index(index).shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(UNASSIGNED)); + Decision decision = executeAllocation(clusterState, shardRouting.primaryShard(), randomFrom(nodes)); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), equalTo("shard is not a follower and is not under the purview of this decider")); + for (ShardRouting replica : shardRouting.replicaShards()) { + assertThat(replica.state(), equalTo(UNASSIGNED)); + decision = executeAllocation(clusterState, replica, randomFrom(nodes)); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), equalTo("shard is not a follower and is not under the purview of this decider")); + } + } + } + + public void testAlreadyBootstrappedFollowerIndex() { + String index = "test-index"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index) + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1).numberOfReplicas(1); + List nodes = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + final Set roles = new HashSet<>(); + roles.add(DiscoveryNodeRole.DATA_ROLE); + if (randomBoolean()) { + roles.add(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE); + } + nodes.add(newNode("node" + i, roles)); + } + DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder(); + nodes.forEach(discoveryNodes::add); + Metadata metadata = Metadata.builder().put(indexMetadata).build(); + RoutingTable.Builder routingTable = RoutingTable.builder().addAsRecovery(metadata.index(index)); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(discoveryNodes).metadata(metadata).routingTable(routingTable.build()).build(); + for (int i = 0; i < clusterState.routingTable().index(index).shards().size(); i++) { + IndexShardRoutingTable shardRouting = clusterState.routingTable().index(index).shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(UNASSIGNED)); + Decision decision = executeAllocation(clusterState, shardRouting.primaryShard(), randomFrom(nodes)); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), + equalTo("shard is a primary follower but was bootstrapped already; hence is not under the purview of this decider")); + for (ShardRouting replica : shardRouting.replicaShards()) { + assertThat(replica.state(), equalTo(UNASSIGNED)); + decision = executeAllocation(clusterState, replica, randomFrom(nodes)); + assertThat(decision.type(), equalTo(Decision.Type.YES)); + assertThat(decision.getExplanation(), equalTo("shard is a replica follower and is not under the purview of this decider")); + } + } + } + + public void testBootstrappingFollowerIndex() { + String index = "test-index"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index) + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1).numberOfReplicas(1); + DiscoveryNode dataOnlyNode = newNode("d1", Set.of(DiscoveryNodeRole.DATA_ROLE)); + DiscoveryNode dataAndRemoteNode = newNode("dr1", Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(dataOnlyNode).add(dataAndRemoteNode).build(); + Metadata metadata = Metadata.builder().put(indexMetadata).build(); + RoutingTable.Builder routingTable = RoutingTable.builder() + .addAsNewRestore(metadata.index(index), newSnapshotRecoverySource(), new IntHashSet()); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(discoveryNodes).metadata(metadata).routingTable(routingTable.build()).build(); + for (int i = 0; i < clusterState.routingTable().index(index).shards().size(); i++) { + IndexShardRoutingTable shardRouting = clusterState.routingTable().index(index).shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(UNASSIGNED)); + Decision noDecision = executeAllocation(clusterState, shardRouting.primaryShard(), dataOnlyNode); + assertThat(noDecision.type(), equalTo(Decision.Type.NO)); + assertThat(noDecision.getExplanation(), + equalTo("shard is a primary follower and being bootstrapped, but node does not have the remote_cluster_client role")); + Decision yesDecision = executeAllocation(clusterState, shardRouting.primaryShard(), dataAndRemoteNode); + assertThat(yesDecision.type(), equalTo(Decision.Type.YES)); + assertThat(yesDecision.getExplanation(), equalTo("shard is a primary follower and node has the remote_cluster_client role")); + for (ShardRouting replica : shardRouting.replicaShards()) { + assertThat(replica.state(), equalTo(UNASSIGNED)); + yesDecision = executeAllocation(clusterState, replica, randomFrom(dataOnlyNode, dataAndRemoteNode)); + assertThat(yesDecision.type(), equalTo(Decision.Type.YES)); + assertThat(yesDecision.getExplanation(), + equalTo("shard is a replica follower and is not under the purview of this decider")); + } + } + } + + static Decision executeAllocation(ClusterState clusterState, ShardRouting shardRouting, DiscoveryNode node) { + final AllocationDecider decider = new CcrPrimaryFollowerAllocationDecider(); + final RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(List.of(decider)), + new RoutingNodes(clusterState), clusterState, ClusterInfo.EMPTY, System.nanoTime()); + routingAllocation.debugDecision(true); + return decider.canAllocate(shardRouting, new RoutingNode(node.getId(), node), routingAllocation); + } + + static RecoverySource.SnapshotRecoverySource newSnapshotRecoverySource() { + Snapshot snapshot = new Snapshot("repo", new SnapshotId("name", "_uuid")); + return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, + new IndexId("test", UUIDs.randomBase64UUID(random()))); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index a423ebc30ca1e..381e8b86e0dc5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -475,6 +476,14 @@ public Collection> ind .collect(Collectors.toList()); } + @Override + public Collection createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return filterPlugins(ClusterPlugin.class) + .stream() + .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) + .collect(Collectors.toList()); + } + private List filterPlugins(Class type) { return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p)) .collect(Collectors.toList());