Skip to content

Commit

Permalink
Support index level allocation filtering for searchable snapshot index (
Browse files Browse the repository at this point in the history
opensearch-project#11563)

* support index level allocation filtering

Signed-off-by: panguixin <[email protected]>

* run spotless

Signed-off-by: panguixin <[email protected]>

* fix IT

Signed-off-by: panguixin <[email protected]>

* fix not statement and add change log

Signed-off-by: panguixin <[email protected]>

---------

Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr authored and rayshrey committed Mar 18, 2024
1 parent 6b8202f commit 503bcf5
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add copy ingest processor ([#11870](https://github.com/opensearch-project/OpenSearch/pull/11870))
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))
- Bump OpenTelemetry from 1.32.0 to 1.34.1 ([#11891](https://github.com/opensearch-project/OpenSearch/pull/11891))
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -25,11 +26,14 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
Expand All @@ -47,6 +51,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -234,6 +240,62 @@ public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode()
assertDocCount(indexName, 100L);
}

public void testSearchableSnapshotAllocationFilterSettings() throws Exception {
final int numShardsIndex = randomIntBetween(3, 6);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

internalCluster().ensureAtLeastNumSearchAndDataNodes(numShardsIndex);
createIndexWithDocsAndEnsureGreen(numShardsIndex, 1, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);

restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
final Set<String> searchNodes = StreamSupport.stream(clusterService().state().getNodes().spliterator(), false)
.filter(DiscoveryNode::isSearchNode)
.map(DiscoveryNode::getId)
.collect(Collectors.toSet());

for (int i = searchNodes.size(); i > 2; --i) {
String pickedNode = randomFrom(searchNodes);
searchNodes.remove(pickedNode);
assertIndexAssignedToNodeOrNot(restoredIndexName, pickedNode, true);
assertTrue(
client.admin()
.indices()
.prepareUpdateSettings(restoredIndexName)
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", pickedNode))
.execute()
.actionGet()
.isAcknowledged()
);
ClusterHealthResponse clusterHealthResponse = client.admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertIndexAssignedToNodeOrNot(restoredIndexName, pickedNode, false);
assertIndexAssignedToNodeOrNot(indexName, pickedNode, true);
}
}

private void assertIndexAssignedToNodeOrNot(String index, String node, boolean assigned) {
final ClusterState state = clusterService().state();
if (assigned) {
assertTrue(state.getRoutingTable().allShards(index).stream().anyMatch(shard -> shard.currentNodeId().equals(node)));
} else {
assertTrue(state.getRoutingTable().allShards(index).stream().noneMatch(shard -> shard.currentNodeId().equals(node)));
}
}

/**
* Tests the functionality of remote shard allocation to
* ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when
Expand Down Expand Up @@ -341,11 +403,16 @@ public void testDeleteSearchableSnapshotBackingIndex() throws Exception {
}

private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException {
createIndexWithDocsAndEnsureGreen(1, numReplicasIndex, numOfDocs, indexName);
}

private void createIndexWithDocsAndEnsureGreen(int numShardsIndex, int numReplicasIndex, int numOfDocs, String indexName)
throws InterruptedException {
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsIndex)
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey())
.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TransportUpdateSettingsAction extends TransportClusterManagerNodeAc
"index.number_of_replicas"
);

private final static String[] ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES = { "index.search.slowlog" };
private final static String[] ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES = { "index.search.slowlog", "index.routing.allocation" };

private final MetadataUpdateSettingsService updateSettingsService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,23 +729,6 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
+ " was matched but wasn't removed";
}

public void swapPrimaryWithReplica(
Logger logger,
ShardRouting primaryShard,
ShardRouting replicaShard,
RoutingChangesObserver changes
) {
assert primaryShard.primary() : "Invalid primary shard provided";
assert !replicaShard.primary() : "Invalid Replica shard provided";

ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica();
ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary();
updateAssigned(primaryShard, newPrimary);
updateAssigned(replicaShard, newReplica);
logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary());
changes.replicaPromoted(newPrimary);
}

private void unassignPrimaryAndPromoteActiveReplicaIfExists(
ShardRouting failedShard,
UnassignedInfo unassignedInfo,
Expand Down
Loading

0 comments on commit 503bcf5

Please sign in to comment.