Skip to content

Commit

Permalink
Simplify diff calculation for remote routing table (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#15100)

Simplify diff calculation logic for remote routing table

Signed-off-by: Bukhtawar Khan <[email protected]>
Co-authored-by: Shailendra Singh <[email protected]>
  • Loading branch information
2 people authored and akolarkunnu committed Sep 10, 2024
1 parent d65a085 commit 9d91758
Show file tree
Hide file tree
Showing 20 changed files with 975 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -46,7 +47,7 @@
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
BlobPath indexRoutingPath;
List<BlobPath> indexRoutingPaths;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;

Expand Down Expand Up @@ -91,7 +92,7 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
updateIndexSettings(INDEX_NAME, IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2);
ensureGreen(INDEX_NAME);
assertBusy(() -> {
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
// At-least 3 new index routing files will be created as shards will transition from INIT -> UNASSIGNED -> STARTED state
assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3);
});
Expand All @@ -112,6 +113,47 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableWithMultipleIndex() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTables = getRoutingTableFromAllNodes();
// Verify indices in routing table
Set<String> expectedIndicesInRoutingTable = Set.of(INDEX_NAME);
assertEquals(routingTables.get(0).getIndicesRouting().keySet(), expectedIndicesInRoutingTable);
// Verify routing table across all nodes is equal
assertTrue(areRoutingTablesSame(routingTables));

// Create new index
createIndex(INDEX_NAME_1, remoteStoreIndexSettings(1, 5));
ensureGreen(INDEX_NAME_1);

latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

updateIndexRoutingPaths(repository);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 2, deletedIndexNames, true);
routingTables = getRoutingTableFromAllNodes();
// Verify indices in routing table
expectedIndicesInRoutingTable = Set.of(INDEX_NAME, INDEX_NAME_1);
assertEquals(routingTables.get(0).getIndicesRouting().keySet(), expectedIndicesInRoutingTable);
// Verify routing table across all nodes is equal
assertTrue(areRoutingTablesSame(routingTables));
}

public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception {
prepareClusterAndVerifyRepository();

Expand Down Expand Up @@ -166,7 +208,7 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
assertRemoteStoreRepositoryOnAllNodes(REMOTE_ROUTING_TABLE_REPO);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

Expand Down Expand Up @@ -201,7 +243,7 @@ public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
assertRemoteStoreRepositoryOnAllNodes(REMOTE_ROUTING_TABLE_REPO);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

Expand Down Expand Up @@ -240,10 +282,14 @@ private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception

BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTables.get(0).getIndex().getUUID());
indexRoutingPaths = new ArrayList<>();
for (IndexRoutingTable indexRoutingTable : indexRoutingTables) {
indexRoutingPaths.add(getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTable.getIndex().getUUID()));
}

assertBusy(() -> {
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
int totalRoutingFiles = calculateTotalRoutingFiles(repository);
indexRoutingFiles.set(totalRoutingFiles);
// There would be >=3 files as shards will transition from UNASSIGNED -> INIT -> STARTED state
assertTrue(indexRoutingFiles.get() >= 3);
});
Expand Down Expand Up @@ -280,11 +326,19 @@ private void verifyUpdatesInManifestFile(
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();

assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated());
assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted());
assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size());

// Check if all paths in manifest.getIndicesRouting() are present in indexRoutingPaths
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) {
assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString()));
boolean pathFound = false;
for (BlobPath indexRoutingPath : indexRoutingPaths) {
if (uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString())) {
pathFound = true;
break;
}
}
assertTrue("Uploaded file not found in indexRoutingPaths: " + uploadedFilename.getUploadedFilename(), pathFound);
}
assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null);
}
Expand All @@ -305,6 +359,24 @@ private List<RoutingTable> getRoutingTableFromAllNodes() throws ExecutionExcepti
return routingTables;
}

private void updateIndexRoutingPaths(BlobStoreRepository repository) {
BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());

indexRoutingPaths.clear(); // Clear the list to avoid stale data
for (IndexRoutingTable indexRoutingTable : indexRoutingTables) {
indexRoutingPaths.add(getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTable.getIndex().getUUID()));
}
}

private int calculateTotalRoutingFiles(BlobStoreRepository repository) throws IOException {
int totalRoutingFiles = 0;
for (BlobPath path : indexRoutingPaths) {
totalRoutingFiles += repository.blobStore().blobContainer(path).listBlobs().size();
}
return totalRoutingFiles;
}

private boolean areRoutingTablesSame(List<RoutingTable> routingTables) {
if (routingTables == null || routingTables.isEmpty()) {
return false;
Expand Down Expand Up @@ -356,7 +428,6 @@ private void deleteIndexAndVerify(RemoteManifestManager remoteManifestManager) {
);
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty());
assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME));
assertTrue(manifest.getIndicesRouting().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -75,7 +77,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class IndexShardRoutingTable implements Iterable<ShardRouting> {
public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTable> implements Iterable<ShardRouting> {

final ShardShuffler shuffler;
// Shuffler for weighted round-robin shard routing. This uses rotation to permute shards.
Expand Down Expand Up @@ -545,6 +547,12 @@ private static List<ShardRouting> rankShardsAndUpdateStats(
return sortedShards;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
this.shardId().getIndex().writeTo(out);
Builder.writeToThin(this, out);
}

private static class NodeRankComparator implements Comparator<ShardRouting> {
private final Map<String, Double> nodeRanks;

Expand Down Expand Up @@ -1067,6 +1075,14 @@ private void populateInitializingShardWeightsMap(WeightedRouting weightedRouting
}
}

public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException {
return IndexShardRoutingTable.Builder.readFrom(in);
}

public static Diff<IndexShardRoutingTable> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(IndexShardRoutingTable::readFrom, in);
}

/**
* Builder of an index shard routing table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ public Diff<RoutingTable> diff(RoutingTable previousState) {
return new RoutingTableDiff(previousState, this);
}

public Diff<RoutingTable> incrementalDiff(RoutingTable previousState) {
return new RoutingTableIncrementalDiff(previousState, this);
}

public static Diff<RoutingTable> readDiffFrom(StreamInput in) throws IOException {
return new RoutingTableDiff(in);
}
Expand All @@ -403,7 +407,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

private static class RoutingTableDiff implements Diff<RoutingTable> {
private static class RoutingTableDiff implements Diff<RoutingTable>, StringKeyDiffProvider<IndexRoutingTable> {

private final long version;

Expand Down Expand Up @@ -432,6 +436,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
indicesRouting.writeTo(out);
}

@Override
public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> provideDiff() {
return (DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>) indicesRouting;
}
}

public static Builder builder() {
Expand Down
Loading

0 comments on commit 9d91758

Please sign in to comment.