Skip to content

Commit

Permalink
Adding UTs and removing seeding metadata change
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Apr 17, 2024
1 parent b99fb0d commit c11bf32
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,23 @@
package org.opensearch.remotemigration;

import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_STORE_CUSTOM_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_STORE_SEEDED_SHARDS_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -59,6 +53,10 @@ public void testIndexSettingsUpdateAfterIndexMovedToRemoteThroughAllocationExclu
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
createIndexAndAssertDocrepProperties(indexName, oneReplica);

AsyncIndexingService asyncIndexingService = new AsyncIndexingService(indexName);
asyncIndexingService.startIndexing();

String replicationType;
GetSettingsResponse response;
String remoteStoreEnabled;
Expand All @@ -71,6 +69,7 @@ public void testIndexSettingsUpdateAfterIndexMovedToRemoteThroughAllocationExclu
excludeNodeSet("type", "docrep");
waitForRelocation();
waitNoPendingTasksOnAll();
asyncIndexingService.stopIndexing();

response = internalCluster().client().admin().indices().prepareGetSettings(indexName).get();
remoteStoreEnabled = response.getSetting(indexName, SETTING_REMOTE_STORE_ENABLED);
Expand All @@ -93,6 +92,12 @@ public void testIndexSettingsUpdateAfterIndexMovedToRemoteThroughManualReroute()
createIndexAndAssertDocrepProperties(indexName1, oneReplica);
createIndexAndAssertDocrepProperties(indexName2, oneReplica);

AsyncIndexingService indexOne = new AsyncIndexingService(indexName1);
indexOne.startIndexing();

AsyncIndexingService indexTwo = new AsyncIndexingService(indexName2);
indexTwo.startIndexing();

initDocRepToRemoteMigration();
stopShardRebalancing();

Expand Down Expand Up @@ -137,6 +142,9 @@ public void testIndexSettingsUpdateAfterIndexMovedToRemoteThroughManualReroute()
waitForRelocation();
waitNoPendingTasksOnAll();

indexOne.stopIndexing();
indexTwo.stopIndexing();

assertRemoteProperties(indexName1);
assertDocrepProperties(indexName2);
}
Expand All @@ -149,15 +157,15 @@ public void testIndexSettingsUpdatedOnlyForMigratingIndex() throws Exception {
internalCluster().validateClusterFormed();

String indexName = "migration-index";
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
);
indexBulk(indexName, 100);
refresh(indexName);
ensureGreen(indexName);
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
createIndexAndAssertDocrepProperties(indexName, oneReplica);

AsyncIndexingService indexingService = new AsyncIndexingService(indexName);
indexingService.startIndexing();

assertDocrepProperties(indexName);
long initalMetadataVersion = internalCluster().client()
.admin()
.cluster()
Expand All @@ -177,6 +185,8 @@ public void testIndexSettingsUpdatedOnlyForMigratingIndex() throws Exception {

waitForRelocation();
waitNoPendingTasksOnAll();
indexingService.stopIndexing();

assertRemoteProperties(indexName);
assertTrue(
initalMetadataVersion < internalCluster().client()
Expand Down Expand Up @@ -218,88 +228,8 @@ initalMetadataVersion < internalCluster().client()
);
}

@TestLogging(reason = "", value = "org.opensearch.cluster.metadata:TRACE,org.opensearch.cluster.action.shard:TRACE")
public void testCustomSeedingMetadata() throws Exception {
String indexName = "custom-seeding-metadata-index";
internalCluster().startClusterManagerOnlyNode();
List<String> docRepNodeNames = internalCluster().startNodes(2, Settings.builder().put("node.attr._type", "docrep").build());

// create index with 2 primaries
Settings zeroReplica = Settings.builder().put("number_of_replicas", 0).put("number_of_shards", 4).build();
createIndex(indexName, zeroReplica);
indexBulk(indexName, 100);
ensureGreen(indexName);

stopShardRebalancing();
initDocRepToRemoteMigration();
// add remote node in mixed mode cluster
addRemote = true;
List<String> remoteNodeNames = internalCluster().startNodes(2, Settings.builder().put("node.attr._type", "remote").build());
internalCluster().validateClusterFormed();

assertEquals(
internalCluster().client()
.admin()
.cluster()
.prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME)
.get()
.repositories()
.size(),
2
);

assertAcked(
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(indexName, 0, primaryNodeName(indexName, 0), remoteNodeNames.get(0)))
.add(new MoveAllocationCommand(indexName, 1, primaryNodeName(indexName, 1), remoteNodeNames.get(0)))
.add(new MoveAllocationCommand(indexName, 2, primaryNodeName(indexName, 2), remoteNodeNames.get(1)))
.execute()
.actionGet()
);
waitForRelocation();
ensureGreen(indexName);
waitNoPendingTasksOnAll();

ClusterState clusterState = internalCluster().client().admin().cluster().prepareState().get().getState();
IndexMetadata indexMetadata = clusterState.metadata().index(indexName);
HashSet<String> seededShards = new HashSet<>(
Strings.commaDelimitedListToSet(
indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.REMOTE_STORE_SEEDED_SHARDS_KEY)
)
);
HashSet<String> expectedSeededShards = new HashSet<>();
for (int i = 0; i < 3; i++) {
expectedSeededShards.add("[" + indexName + "][" + i + "]");
}
assertEquals(expectedSeededShards, seededShards);

assertAcked(
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(indexName, 3, primaryNodeName(indexName, 3), remoteNodeNames.get(1)))
.execute()
.actionGet()
);
waitForRelocation();
ensureGreen(indexName);
waitNoPendingTasksOnAll();

clusterState = internalCluster().client().admin().cluster().prepareState().get().getState();
indexMetadata = clusterState.metadata().index(indexName);
Map<String, String> currentCustomData = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY);
assertFalse(currentCustomData.containsKey(REMOTE_STORE_SEEDED_SHARDS_KEY));
assertTrue(
currentCustomData.containsKey(RemoteStoreEnums.PathType.NAME)
&& currentCustomData.containsKey(RemoteStoreEnums.PathHashAlgorithm.NAME)
);
}

private void createIndexAndAssertDocrepProperties(String index, Settings settings) throws Exception {
private void createIndexAndAssertDocrepProperties(String index, Settings settings) {
createIndex(index, settings);
indexBulk(index, 100);
refresh(index);
ensureGreen(index);
assertDocrepProperties(index);
Expand All @@ -309,15 +239,23 @@ private void assertDocrepProperties(String index) {
GetSettingsResponse response = internalCluster().client().admin().indices().prepareGetSettings(index).get();
String remoteStoreEnabled = response.getSetting(index, SETTING_REMOTE_STORE_ENABLED);
String replicationType = response.getSetting(index, SETTING_REPLICATION_TYPE);
String segmentRepo = response.getSetting(index, SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
String translogRepo = response.getSetting(index, SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
assertNull(remoteStoreEnabled);
assertNull(segmentRepo);
assertNull(translogRepo);
assertEquals(replicationType, "DOCUMENT");
}

private void assertRemoteProperties(String index) {
GetSettingsResponse response = internalCluster().client().admin().indices().prepareGetSettings(index).get();
String remoteStoreEnabled = response.getSetting(index, SETTING_REMOTE_STORE_ENABLED);
String replicationType = response.getSetting(index, SETTING_REPLICATION_TYPE);
String segmentRepo = response.getSetting(index, SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
String translogRepo = response.getSetting(index, SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
assertEquals(remoteStoreEnabled, "true");
assertNotNull(segmentRepo);
assertNotNull(translogRepo);
assertEquals(replicationType, "SEGMENT");
}

Expand Down
Loading

0 comments on commit c11bf32

Please sign in to comment.