Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify diff calculation for remote routing table #15100

Merged
merged 8 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -46,7 +47,7 @@
public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
BlobPath indexRoutingPath;
List<BlobPath> indexRoutingPaths;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;

Expand Down Expand Up @@ -91,7 +92,7 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
updateIndexSettings(INDEX_NAME, IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2);
ensureGreen(INDEX_NAME);
assertBusy(() -> {
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterUpdate = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
// At-least 3 new index routing files will be created as shards will transition from INIT -> UNASSIGNED -> STARTED state
assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3);
});
Expand All @@ -112,6 +113,47 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception {
assertTrue(areRoutingTablesSame(routingTableVersions));
}

public void testRemoteRoutingTableWithMultipleIndex() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);
RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);
List<String> expectedIndexNames = new ArrayList<>();
List<String> deletedIndexNames = new ArrayList<>();
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true);

List<RoutingTable> routingTables = getRoutingTableFromAllNodes();
// Verify indices in routing table
Set<String> expectedIndicesInRoutingTable = Set.of(INDEX_NAME);
assertEquals(routingTables.get(0).getIndicesRouting().keySet(), expectedIndicesInRoutingTable);
// Verify routing table across all nodes is equal
assertTrue(areRoutingTablesSame(routingTables));

// Create new index
createIndex(INDEX_NAME_1, remoteStoreIndexSettings(1, 5));
ensureGreen(INDEX_NAME_1);

latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

updateIndexRoutingPaths(repository);
verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 2, deletedIndexNames, true);
routingTables = getRoutingTableFromAllNodes();
// Verify indices in routing table
expectedIndicesInRoutingTable = Set.of(INDEX_NAME, INDEX_NAME_1);
assertEquals(routingTables.get(0).getIndicesRouting().keySet(), expectedIndicesInRoutingTable);
// Verify routing table across all nodes is equal
assertTrue(areRoutingTablesSame(routingTables));
}

public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception {
prepareClusterAndVerifyRepository();

Expand Down Expand Up @@ -166,7 +208,7 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception {
assertRemoteStoreRepositoryOnAllNodes(REMOTE_ROUTING_TABLE_REPO);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

Expand Down Expand Up @@ -201,7 +243,7 @@ public void testRemoteRoutingTableIndexMasterRestart() throws Exception {
assertRemoteStoreRepositoryOnAllNodes(REMOTE_ROUTING_TABLE_REPO);

assertBusy(() -> {
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size();
int indexRoutingFilesAfterNodeDrop = repository.blobStore().blobContainer(indexRoutingPaths.get(0)).listBlobs().size();
assertTrue(indexRoutingFilesAfterNodeDrop > indexRoutingFiles.get());
});

Expand Down Expand Up @@ -240,10 +282,14 @@ private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception

BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());
indexRoutingPath = getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTables.get(0).getIndex().getUUID());
indexRoutingPaths = new ArrayList<>();
for (IndexRoutingTable indexRoutingTable : indexRoutingTables) {
indexRoutingPaths.add(getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTable.getIndex().getUUID()));
}

assertBusy(() -> {
indexRoutingFiles.set(repository.blobStore().blobContainer(indexRoutingPath).listBlobs().size());
int totalRoutingFiles = calculateTotalRoutingFiles(repository);
indexRoutingFiles.set(totalRoutingFiles);
// There would be >=3 files as shards will transition from UNASSIGNED -> INIT -> STARTED state
assertTrue(indexRoutingFiles.get() >= 3);
});
Expand Down Expand Up @@ -280,11 +326,19 @@ private void verifyUpdatesInManifestFile(
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();

assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated());
assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted());
assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size());

// Check if all paths in manifest.getIndicesRouting() are present in indexRoutingPaths
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) {
assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString()));
boolean pathFound = false;
for (BlobPath indexRoutingPath : indexRoutingPaths) {
if (uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString())) {
pathFound = true;
break;
}
}
assertTrue("Uploaded file not found in indexRoutingPaths: " + uploadedFilename.getUploadedFilename(), pathFound);
}
assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null);
}
Expand All @@ -305,6 +359,24 @@ private List<RoutingTable> getRoutingTableFromAllNodes() throws ExecutionExcepti
return routingTables;
}

private void updateIndexRoutingPaths(BlobStoreRepository repository) {
BlobPath baseMetadataPath = getBaseMetadataPath(repository);
List<IndexRoutingTable> indexRoutingTables = new ArrayList<>(getClusterState().routingTable().indicesRouting().values());

indexRoutingPaths.clear(); // Clear the list to avoid stale data
for (IndexRoutingTable indexRoutingTable : indexRoutingTables) {
indexRoutingPaths.add(getIndexRoutingPath(baseMetadataPath.add(INDEX_ROUTING_TABLE), indexRoutingTable.getIndex().getUUID()));
}
}

private int calculateTotalRoutingFiles(BlobStoreRepository repository) throws IOException {
int totalRoutingFiles = 0;
for (BlobPath path : indexRoutingPaths) {
totalRoutingFiles += repository.blobStore().blobContainer(path).listBlobs().size();
}
return totalRoutingFiles;
}

private boolean areRoutingTablesSame(List<RoutingTable> routingTables) {
if (routingTables == null || routingTables.isEmpty()) {
return false;
Expand Down Expand Up @@ -356,7 +428,6 @@ private void deleteIndexAndVerify(RemoteManifestManager remoteManifestManager) {
);
assertTrue(latestManifest.isPresent());
ClusterMetadataManifest manifest = latestManifest.get();
assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().isEmpty());
assertTrue(manifest.getDiffManifest().getIndicesDeleted().contains(INDEX_NAME));
assertTrue(manifest.getIndicesRouting().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -75,7 +77,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class IndexShardRoutingTable implements Iterable<ShardRouting> {
public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTable> implements Iterable<ShardRouting> {

final ShardShuffler shuffler;
// Shuffler for weighted round-robin shard routing. This uses rotation to permute shards.
Expand Down Expand Up @@ -545,6 +547,12 @@ private static List<ShardRouting> rankShardsAndUpdateStats(
return sortedShards;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
this.shardId().getIndex().writeTo(out);
Builder.writeToThin(this, out);
}

private static class NodeRankComparator implements Comparator<ShardRouting> {
private final Map<String, Double> nodeRanks;

Expand Down Expand Up @@ -1067,6 +1075,14 @@ private void populateInitializingShardWeightsMap(WeightedRouting weightedRouting
}
}

public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException {
return IndexShardRoutingTable.Builder.readFrom(in);
}

public static Diff<IndexShardRoutingTable> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(IndexShardRoutingTable::readFrom, in);
}

/**
* Builder of an index shard routing table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@
return new RoutingTableDiff(previousState, this);
}

public Diff<RoutingTable> incrementalDiff(RoutingTable previousState) {
return new RoutingTableIncrementalDiff(previousState, this);
}

public static Diff<RoutingTable> readDiffFrom(StreamInput in) throws IOException {
return new RoutingTableDiff(in);
}
Expand All @@ -403,7 +407,7 @@
}
}

private static class RoutingTableDiff implements Diff<RoutingTable> {
private static class RoutingTableDiff implements Diff<RoutingTable>, StringKeyDiffProvider<IndexRoutingTable> {

private final long version;

Expand Down Expand Up @@ -432,6 +436,11 @@
out.writeLong(version);
indicesRouting.writeTo(out);
}

@Override
public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> provideDiff() {
return (DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>) indicesRouting;

Check warning on line 442 in server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java#L442

Added line #L442 was not covered by tests
}
}

public static Builder builder() {
Expand Down
Loading
Loading