Skip to content

Commit

Permalink
Skip Filter Allocation Decider during mixed mode for existing indices… (
Browse files Browse the repository at this point in the history
opensearch-project#12960)

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Apr 9, 2024
1 parent bad49ef commit 6532caa
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteMigrationAllocationDeciderIT extends MigrationBaseTestCase {

// When the primary is on doc rep node, existing replica copy can get allocated on excluded docrep node.
public void testFilterAllocationSkipsReplica() throws IOException {
addRemote = false;
List<String> docRepNodes = internalCluster().startNodes(3);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")
.build()
);
ensureGreen("test");

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertTrue(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", String.join(",", docRepNodes)))
.execute()
.actionGet()
.isAcknowledged()
);
internalCluster().stopRandomDataNode();
ensureGreen("test");
}

// When the primary is on remote node, new replica copy shouldn't get allocated on an excluded docrep node.
public void testFilterAllocationSkipsReplicaOnExcludedNode() throws IOException {
addRemote = false;
List<String> nodes = internalCluster().startNodes(2);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")
.build()
);
ensureGreen("test");
addRemote = true;

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String remoteNode = internalCluster().startNode();

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertEquals(remoteNode, primaryNodeName("test"));

assertTrue(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._name", String.join(",", nodes)))
.execute()
.actionGet()
.isAcknowledged()
);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName("test")));
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow("test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Map;

Expand Down Expand Up @@ -102,14 +104,32 @@ public class FilterAllocationDecider extends AllocationDecider {
private volatile DiscoveryNodeFilters clusterRequireFilters;
private volatile DiscoveryNodeFilters clusterIncludeFilters;
private volatile DiscoveryNodeFilters clusterExcludeFilters;
private volatile RemoteStoreNodeService.Direction migrationDirection;
private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode;

public FilterAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings));
setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings));
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings);

clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
clusterSettings.addSettingsUpdateConsumer(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, this::setMigrationDirection);
clusterSettings.addSettingsUpdateConsumer(
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
this::setCompatibilityMode
);
}

private void setMigrationDirection(RemoteStoreNodeService.Direction migrationDirection) {
this.migrationDirection = migrationDirection;
}

private void setCompatibilityMode(RemoteStoreNodeService.CompatibilityMode compatibilityMode) {
this.compatibilityMode = compatibilityMode;
}

@Override
Expand All @@ -127,10 +147,28 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
"initial allocation of the shrunken index is only allowed on nodes [%s] that hold a copy of every shard in the index";
return allocation.decision(Decision.NO, NAME, explanation, initialRecoveryFilters);
}

Decision decision = isRemoteStoreMigrationReplicaDecision(shardRouting, allocation);
if (decision != null) return decision;
}
return shouldFilter(shardRouting, node.node(), allocation);
}

public Decision isRemoteStoreMigrationReplicaDecision(ShardRouting shardRouting, RoutingAllocation allocation) {
assert shardRouting.unassigned();
boolean primaryOnRemote = RemoteStoreMigrationAllocationDecider.isPrimaryOnRemote(shardRouting.shardId(), allocation);
if (shardRouting.primary() == false
&& shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED
&& (compatibilityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED))
&& (migrationDirection.equals(RemoteStoreNodeService.Direction.REMOTE_STORE))
&& primaryOnRemote == false) {
String explanation =
"in remote store migration, allocation filters are not applicable for replica copies whose primary is on doc rep node";
return allocation.decision(Decision.YES, NAME, explanation);
}
return null;
}

@Override
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return shouldFilter(indexMetadata, node.node(), allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction;
Expand All @@ -60,9 +61,8 @@ public class RemoteStoreMigrationAllocationDecider extends AllocationDecider {

public static final String NAME = "remote_store_migration";

private Direction migrationDirection;
private CompatibilityMode compatibilityMode;
private boolean remoteStoreBackedIndex;
volatile private Direction migrationDirection;
volatile private CompatibilityMode compatibilityMode;

public RemoteStoreMigrationAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings);
Expand Down Expand Up @@ -106,9 +106,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// check for remote store backed indices
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
if (IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.exists(indexMetadata.getSettings())) {
remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
}
boolean remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings());
if (remoteStoreBackedIndex && targetNode.isRemoteStoreNode() == false) {
// allocations and relocations must be to a remote node
String reason = String.format(
Expand All @@ -133,15 +131,20 @@ private Decision primaryShardDecision(ShardRouting primaryShardRouting, Discover
return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, primaryShardRouting, targetNode, ""));
}

// Checks if primary shard is on a remote node.
static boolean isPrimaryOnRemote(ShardId shardId, RoutingAllocation allocation) {
ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(shardId);
if (primaryShardRouting != null) {
DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId());
return primaryShardNode.isRemoteStoreNode();
}
return false;
}

private Decision replicaShardDecision(ShardRouting replicaShardRouting, DiscoveryNode targetNode, RoutingAllocation allocation) {
if (targetNode.isRemoteStoreNode()) {
ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(replicaShardRouting.shardId());
boolean primaryHasMigratedToRemote = false;
if (primaryShardRouting != null) {
DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId());
primaryHasMigratedToRemote = primaryShardNode.isRemoteStoreNode();
}
if (primaryHasMigratedToRemote == false) {
boolean primaryOnRemote = RemoteStoreMigrationAllocationDecider.isPrimaryOnRemote(replicaShardRouting.shardId(), allocation);
if (primaryOnRemote == false) {
return allocation.decision(
Decision.NO,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand All @@ -50,6 +51,8 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand All @@ -61,6 +64,9 @@
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;

public class FilterAllocationDeciderTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -406,4 +412,57 @@ public void testWildcardIPFilter() {
"test ip validation"
);
}

public void testMixedModeRemoteStoreAllocation() {
// For mixed mode remote store direction cluster's existing indices replica creation ,
// we don't consider filter allocation decider for replica of existing indices
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Settings initialSettings = Settings.builder()
.put("cluster.routing.allocation.exclude._id", "node2")
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED)
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE)
.build();

FilterAllocationDecider filterAllocationDecider = new FilterAllocationDecider(initialSettings, clusterSettings);
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(
filterAllocationDecider,
new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
new ReplicaAfterPrimaryActiveAllocationDecider()
)
);
AllocationService service = new AllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
ClusterState state = createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY);
RoutingTable routingTable = state.routingTable();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0);
allocation.debugDecision(true);
ShardRouting sr = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "")
);
Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate(
sr,
state.getRoutingNodes().node("node2"),
allocation
);
assertEquals(decision.toString(), Type.YES, decision.type());

sr = ShardRouting.newUnassigned(
routingTable.index("sourceIndex").shard(0).shardId(),
false,
RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);
decision = (Decision.Single) filterAllocationDecider.canAllocate(sr, state.getRoutingNodes().node("node2"), allocation);
assertEquals(decision.toString(), Type.NO, decision.type());
}
}

0 comments on commit 6532caa

Please sign in to comment.